You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@metron.apache.org by ma...@apache.org on 2017/06/01 21:41:51 UTC

[32/44] metron git commit: METRON-965: In the case where we specify the syncpolicy in the HDFS Writer, we do not properly clone and end up syncing for every record closes apache/incubator-metron#596

METRON-965: In the case where we specify the syncpolicy in the HDFS Writer, we do not properly clone and end up syncing for every record closes apache/incubator-metron#596


Project: http://git-wip-us.apache.org/repos/asf/metron/repo
Commit: http://git-wip-us.apache.org/repos/asf/metron/commit/64473d4e
Tree: http://git-wip-us.apache.org/repos/asf/metron/tree/64473d4e
Diff: http://git-wip-us.apache.org/repos/asf/metron/diff/64473d4e

Branch: refs/heads/Metron_0.4.0
Commit: 64473d4e8399e71d0ac1a81b6e78e72b982e42f8
Parents: fee758b
Author: cstella <ce...@gmail.com>
Authored: Sat May 20 09:57:13 2017 -0400
Committer: cstella <ce...@gmail.com>
Committed: Sat May 20 09:57:13 2017 -0400

----------------------------------------------------------------------
 .../writer/hdfs/ClonedSyncPolicyCreator.java    | 47 ++++++++++++++++++++
 .../apache/metron/writer/hdfs/HdfsWriter.java   |  6 +--
 .../hdfs/ClonedSyncPolicyCreatorTest.java       | 42 +++++++++++++++++
 3 files changed, 92 insertions(+), 3 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/metron/blob/64473d4e/metron-platform/metron-writer/src/main/java/org/apache/metron/writer/hdfs/ClonedSyncPolicyCreator.java
----------------------------------------------------------------------
diff --git a/metron-platform/metron-writer/src/main/java/org/apache/metron/writer/hdfs/ClonedSyncPolicyCreator.java b/metron-platform/metron-writer/src/main/java/org/apache/metron/writer/hdfs/ClonedSyncPolicyCreator.java
new file mode 100644
index 0000000..4d32fc9
--- /dev/null
+++ b/metron-platform/metron-writer/src/main/java/org/apache/metron/writer/hdfs/ClonedSyncPolicyCreator.java
@@ -0,0 +1,47 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.metron.writer.hdfs;
+
+import org.apache.metron.common.configuration.writer.WriterConfiguration;
+import org.apache.metron.common.utils.SerDeUtils;
+import org.apache.storm.hdfs.bolt.sync.SyncPolicy;
+
+public class ClonedSyncPolicyCreator implements SyncPolicyCreator {
+  SyncPolicy syncPolicy;
+  public ClonedSyncPolicyCreator(SyncPolicy policy) {
+    syncPolicy = policy;
+  }
+
+  @Override
+  public SyncPolicy create(String sensor, WriterConfiguration config) {
+    try {
+      //we do a deep clone of the SyncPolicy via kryo serialization.  This gives us a fresh policy
+      //to work with.  The reason we need a fresh policy is that we want to ensure each handler
+      //(one handler per task & sensor type and one handler per file) has its own sync policy.
+      // Reusing a sync policy is a bad idea, so we need to clone it here.  Unfortunately the
+      // SyncPolicy object does not implement Cloneable, so we'll need to clone it via serialization
+      //to get a fresh policy object.  Note: this would be expensive if it was in the critical path,
+      // but should be called infrequently (once per sync).
+      byte[] serializedForm = SerDeUtils.toBytes(syncPolicy);
+      return SerDeUtils.fromBytes(serializedForm, SyncPolicy.class);
+    }
+    catch (Exception e) {
+      throw new IllegalStateException(e.getMessage(), e);
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/metron/blob/64473d4e/metron-platform/metron-writer/src/main/java/org/apache/metron/writer/hdfs/HdfsWriter.java
----------------------------------------------------------------------
diff --git a/metron-platform/metron-writer/src/main/java/org/apache/metron/writer/hdfs/HdfsWriter.java b/metron-platform/metron-writer/src/main/java/org/apache/metron/writer/hdfs/HdfsWriter.java
index e0ab502..c5d1e4f 100644
--- a/metron-platform/metron-writer/src/main/java/org/apache/metron/writer/hdfs/HdfsWriter.java
+++ b/metron-platform/metron-writer/src/main/java/org/apache/metron/writer/hdfs/HdfsWriter.java
@@ -24,6 +24,7 @@ import org.apache.metron.common.dsl.StellarFunctions;
 import org.apache.metron.common.dsl.VariableResolver;
 import org.apache.metron.common.stellar.StellarCompiler;
 import org.apache.metron.common.stellar.StellarProcessor;
+import org.apache.metron.common.utils.SerDeUtils;
 import org.apache.storm.task.TopologyContext;
 import org.apache.storm.tuple.Tuple;
 import org.apache.metron.common.configuration.writer.WriterConfiguration;
@@ -37,8 +38,7 @@ import org.apache.storm.hdfs.bolt.sync.SyncPolicy;
 import org.apache.storm.hdfs.common.rotation.RotationAction;
 import org.json.simple.JSONObject;
 
-import java.io.IOException;
-import java.io.Serializable;
+import java.io.*;
 import java.util.*;
 import java.util.function.Function;
 
@@ -85,7 +85,7 @@ public class HdfsWriter implements BulkMessageWriter<JSONObject>, Serializable {
     this.fileNameFormat.prepare(stormConfig,topologyContext);
     if(syncPolicy != null) {
       //if the user has specified the sync policy, we don't want to override their wishes.
-      syncPolicyCreator = (source,config) -> syncPolicy;
+      syncPolicyCreator = new ClonedSyncPolicyCreator(syncPolicy);
     }
     else {
       //if the user has not, then we want to have the sync policy depend on the batch size.

http://git-wip-us.apache.org/repos/asf/metron/blob/64473d4e/metron-platform/metron-writer/src/test/java/org/apache/metron/writer/hdfs/ClonedSyncPolicyCreatorTest.java
----------------------------------------------------------------------
diff --git a/metron-platform/metron-writer/src/test/java/org/apache/metron/writer/hdfs/ClonedSyncPolicyCreatorTest.java b/metron-platform/metron-writer/src/test/java/org/apache/metron/writer/hdfs/ClonedSyncPolicyCreatorTest.java
new file mode 100644
index 0000000..092bf0f
--- /dev/null
+++ b/metron-platform/metron-writer/src/test/java/org/apache/metron/writer/hdfs/ClonedSyncPolicyCreatorTest.java
@@ -0,0 +1,42 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.metron.writer.hdfs;
+
+import org.apache.storm.hdfs.bolt.sync.CountSyncPolicy;
+import org.apache.storm.hdfs.bolt.sync.SyncPolicy;
+import org.junit.Assert;
+import org.junit.Test;
+
+public class ClonedSyncPolicyCreatorTest {
+
+  @Test
+  public void testClonedPolicy() {
+    CountSyncPolicy basePolicy = new CountSyncPolicy(5);
+    ClonedSyncPolicyCreator creator = new ClonedSyncPolicyCreator(basePolicy);
+    //ensure cloned policy continues to work and adheres to the contract: mark on 5th call.
+    SyncPolicy clonedPolicy = creator.create("blah", null);
+    for(int i = 0;i < 4;++i) {
+      Assert.assertFalse(clonedPolicy.mark(null, i));
+    }
+    Assert.assertTrue(clonedPolicy.mark(null, 5));
+    //reclone policy and ensure it adheres to the original contract.
+    clonedPolicy = creator.create("blah", null);
+    Assert.assertFalse(clonedPolicy.mark(null, 0));
+  }
+}