You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@metron.apache.org by ma...@apache.org on 2017/06/01 21:41:51 UTC
[32/44] metron git commit: METRON-965: In the case where we specify
the syncpolicy in the HDFS Writer,
we do not properly clone and end up syncing for every record closes
apache/incubator-metron#596
METRON-965: In the case where we specify the syncpolicy in the HDFS Writer, we do not properly clone and end up syncing for every record closes apache/incubator-metron#596
Project: http://git-wip-us.apache.org/repos/asf/metron/repo
Commit: http://git-wip-us.apache.org/repos/asf/metron/commit/64473d4e
Tree: http://git-wip-us.apache.org/repos/asf/metron/tree/64473d4e
Diff: http://git-wip-us.apache.org/repos/asf/metron/diff/64473d4e
Branch: refs/heads/Metron_0.4.0
Commit: 64473d4e8399e71d0ac1a81b6e78e72b982e42f8
Parents: fee758b
Author: cstella <ce...@gmail.com>
Authored: Sat May 20 09:57:13 2017 -0400
Committer: cstella <ce...@gmail.com>
Committed: Sat May 20 09:57:13 2017 -0400
----------------------------------------------------------------------
.../writer/hdfs/ClonedSyncPolicyCreator.java | 47 ++++++++++++++++++++
.../apache/metron/writer/hdfs/HdfsWriter.java | 6 +--
.../hdfs/ClonedSyncPolicyCreatorTest.java | 42 +++++++++++++++++
3 files changed, 92 insertions(+), 3 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/metron/blob/64473d4e/metron-platform/metron-writer/src/main/java/org/apache/metron/writer/hdfs/ClonedSyncPolicyCreator.java
----------------------------------------------------------------------
diff --git a/metron-platform/metron-writer/src/main/java/org/apache/metron/writer/hdfs/ClonedSyncPolicyCreator.java b/metron-platform/metron-writer/src/main/java/org/apache/metron/writer/hdfs/ClonedSyncPolicyCreator.java
new file mode 100644
index 0000000..4d32fc9
--- /dev/null
+++ b/metron-platform/metron-writer/src/main/java/org/apache/metron/writer/hdfs/ClonedSyncPolicyCreator.java
@@ -0,0 +1,47 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.metron.writer.hdfs;
+
+import org.apache.metron.common.configuration.writer.WriterConfiguration;
+import org.apache.metron.common.utils.SerDeUtils;
+import org.apache.storm.hdfs.bolt.sync.SyncPolicy;
+
+public class ClonedSyncPolicyCreator implements SyncPolicyCreator {
+ SyncPolicy syncPolicy;
+ public ClonedSyncPolicyCreator(SyncPolicy policy) {
+ syncPolicy = policy;
+ }
+
+ @Override
+ public SyncPolicy create(String sensor, WriterConfiguration config) {
+ try {
+ //we do a deep clone of the SyncPolicy via kryo serialization. This gives us a fresh policy
+ //to work with. The reason we need a fresh policy is that we want to ensure each handler
+ //(one handler per task & sensor type and one handler per file) has its own sync policy.
+ // Reusing a sync policy is a bad idea, so we need to clone it here. Unfortunately the
+ // SyncPolicy object does not implement Cloneable, so we'll need to clone it via serialization
+ //to get a fresh policy object. Note: this would be expensive if it was in the critical path,
+ // but should be called infrequently (once per sync).
+ byte[] serializedForm = SerDeUtils.toBytes(syncPolicy);
+ return SerDeUtils.fromBytes(serializedForm, SyncPolicy.class);
+ }
+ catch (Exception e) {
+ throw new IllegalStateException(e.getMessage(), e);
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/metron/blob/64473d4e/metron-platform/metron-writer/src/main/java/org/apache/metron/writer/hdfs/HdfsWriter.java
----------------------------------------------------------------------
diff --git a/metron-platform/metron-writer/src/main/java/org/apache/metron/writer/hdfs/HdfsWriter.java b/metron-platform/metron-writer/src/main/java/org/apache/metron/writer/hdfs/HdfsWriter.java
index e0ab502..c5d1e4f 100644
--- a/metron-platform/metron-writer/src/main/java/org/apache/metron/writer/hdfs/HdfsWriter.java
+++ b/metron-platform/metron-writer/src/main/java/org/apache/metron/writer/hdfs/HdfsWriter.java
@@ -24,6 +24,7 @@ import org.apache.metron.common.dsl.StellarFunctions;
import org.apache.metron.common.dsl.VariableResolver;
import org.apache.metron.common.stellar.StellarCompiler;
import org.apache.metron.common.stellar.StellarProcessor;
+import org.apache.metron.common.utils.SerDeUtils;
import org.apache.storm.task.TopologyContext;
import org.apache.storm.tuple.Tuple;
import org.apache.metron.common.configuration.writer.WriterConfiguration;
@@ -37,8 +38,7 @@ import org.apache.storm.hdfs.bolt.sync.SyncPolicy;
import org.apache.storm.hdfs.common.rotation.RotationAction;
import org.json.simple.JSONObject;
-import java.io.IOException;
-import java.io.Serializable;
+import java.io.*;
import java.util.*;
import java.util.function.Function;
@@ -85,7 +85,7 @@ public class HdfsWriter implements BulkMessageWriter<JSONObject>, Serializable {
this.fileNameFormat.prepare(stormConfig,topologyContext);
if(syncPolicy != null) {
//if the user has specified the sync policy, we don't want to override their wishes.
- syncPolicyCreator = (source,config) -> syncPolicy;
+ syncPolicyCreator = new ClonedSyncPolicyCreator(syncPolicy);
}
else {
//if the user has not, then we want to have the sync policy depend on the batch size.
http://git-wip-us.apache.org/repos/asf/metron/blob/64473d4e/metron-platform/metron-writer/src/test/java/org/apache/metron/writer/hdfs/ClonedSyncPolicyCreatorTest.java
----------------------------------------------------------------------
diff --git a/metron-platform/metron-writer/src/test/java/org/apache/metron/writer/hdfs/ClonedSyncPolicyCreatorTest.java b/metron-platform/metron-writer/src/test/java/org/apache/metron/writer/hdfs/ClonedSyncPolicyCreatorTest.java
new file mode 100644
index 0000000..092bf0f
--- /dev/null
+++ b/metron-platform/metron-writer/src/test/java/org/apache/metron/writer/hdfs/ClonedSyncPolicyCreatorTest.java
@@ -0,0 +1,42 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.metron.writer.hdfs;
+
+import org.apache.storm.hdfs.bolt.sync.CountSyncPolicy;
+import org.apache.storm.hdfs.bolt.sync.SyncPolicy;
+import org.junit.Assert;
+import org.junit.Test;
+
+public class ClonedSyncPolicyCreatorTest {
+
+ @Test
+ public void testClonedPolicy() {
+ CountSyncPolicy basePolicy = new CountSyncPolicy(5);
+ ClonedSyncPolicyCreator creator = new ClonedSyncPolicyCreator(basePolicy);
+ //ensure cloned policy continues to work and adheres to the contract: mark on 5th call.
+ SyncPolicy clonedPolicy = creator.create("blah", null);
+ for(int i = 0;i < 4;++i) {
+ Assert.assertFalse(clonedPolicy.mark(null, i));
+ }
+ Assert.assertTrue(clonedPolicy.mark(null, 5));
+ //reclone policy and ensure it adheres to the original contract.
+ clonedPolicy = creator.create("blah", null);
+ Assert.assertFalse(clonedPolicy.mark(null, 0));
+ }
+}