You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@metron.apache.org by ce...@apache.org on 2017/05/16 20:50:12 UTC

metron git commit: METRON-955: Make the default sync policy for HDFS Writer be based on the batch size closes apache/incubator-metron#589

Repository: metron
Updated Branches:
  refs/heads/master c1c212117 -> be0307659


METRON-955: Make the default sync policy for HDFS Writer be based on the batch size closes apache/incubator-metron#589


Project: http://git-wip-us.apache.org/repos/asf/metron/repo
Commit: http://git-wip-us.apache.org/repos/asf/metron/commit/be030765
Tree: http://git-wip-us.apache.org/repos/asf/metron/tree/be030765
Diff: http://git-wip-us.apache.org/repos/asf/metron/diff/be030765

Branch: refs/heads/master
Commit: be03076599544f2baedb1b3010ad50625b2f32ae
Parents: c1c2121
Author: cstella <ce...@gmail.com>
Authored: Tue May 16 16:50:04 2017 -0400
Committer: cstella <ce...@gmail.com>
Committed: Tue May 16 16:50:04 2017 -0400

----------------------------------------------------------------------
 .../apache/metron/writer/hdfs/HdfsWriter.java   | 22 ++++++++++++-----
 .../metron/writer/hdfs/SourceHandler.java       | 14 +++++------
 .../metron/writer/hdfs/SyncPolicyCreator.java   | 25 ++++++++++++++++++++
 .../metron/writer/hdfs/HdfsWriterTest.java      |  4 ++--
 4 files changed, 50 insertions(+), 15 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/metron/blob/be030765/metron-platform/metron-writer/src/main/java/org/apache/metron/writer/hdfs/HdfsWriter.java
----------------------------------------------------------------------
diff --git a/metron-platform/metron-writer/src/main/java/org/apache/metron/writer/hdfs/HdfsWriter.java b/metron-platform/metron-writer/src/main/java/org/apache/metron/writer/hdfs/HdfsWriter.java
index a86dfbc..e0ab502 100644
--- a/metron-platform/metron-writer/src/main/java/org/apache/metron/writer/hdfs/HdfsWriter.java
+++ b/metron-platform/metron-writer/src/main/java/org/apache/metron/writer/hdfs/HdfsWriter.java
@@ -40,16 +40,18 @@ import org.json.simple.JSONObject;
 import java.io.IOException;
 import java.io.Serializable;
 import java.util.*;
+import java.util.function.Function;
 
 public class HdfsWriter implements BulkMessageWriter<JSONObject>, Serializable {
   List<RotationAction> rotationActions = new ArrayList<>();
   FileRotationPolicy rotationPolicy = new NoRotationPolicy();
-  SyncPolicy syncPolicy = new CountSyncPolicy(1); //sync every time, duh.
+  SyncPolicy syncPolicy;
   FileNameFormat fileNameFormat;
   Map<SourceHandlerKey, SourceHandler> sourceHandlerMap = new HashMap<>();
   int maxOpenFiles = 500;
   transient StellarProcessor stellarProcessor;
   transient Map stormConfig;
+  transient SyncPolicyCreator syncPolicyCreator;
 
 
   public HdfsWriter withFileNameFormat(FileNameFormat fileNameFormat){
@@ -81,6 +83,14 @@ public class HdfsWriter implements BulkMessageWriter<JSONObject>, Serializable {
     this.stormConfig = stormConfig;
     this.stellarProcessor = new StellarProcessor();
     this.fileNameFormat.prepare(stormConfig,topologyContext);
+    if(syncPolicy != null) {
+      //if the user has specified the sync policy, we don't want to override their wishes.
+      syncPolicyCreator = (source,config) -> syncPolicy;
+    }
+    else {
+      //if the user has not, then we want to have the sync policy depend on the batch size.
+      syncPolicyCreator = (source, config) -> new CountSyncPolicy(config == null?1:config.getBatchSize(source));
+    }
   }
 
 
@@ -92,18 +102,18 @@ public class HdfsWriter implements BulkMessageWriter<JSONObject>, Serializable {
                    ) throws Exception
   {
     BulkWriterResponse response = new BulkWriterResponse();
+
     // Currently treating all the messages in a group for pass/failure.
     try {
       // Messages can all result in different HDFS paths, because of Stellar Expressions, so we'll need to iterate through
       for(JSONObject message : messages) {
-        Map<String, Object> val = configurations.getSensorConfig(sourceType);
         String path = getHdfsPathExtension(
                 sourceType,
                 (String)configurations.getSensorConfig(sourceType).getOrDefault(IndexingConfigurations.OUTPUT_PATH_FUNCTION_CONF, ""),
                 message
         );
-        SourceHandler handler = getSourceHandler(sourceType, path);
-        handler.handle(message);
+        SourceHandler handler = getSourceHandler(sourceType, path, configurations);
+        handler.handle(message, sourceType, configurations, syncPolicyCreator);
       }
     } catch (Exception e) {
       response.addAllErrors(e, tuples);
@@ -142,7 +152,7 @@ public class HdfsWriter implements BulkMessageWriter<JSONObject>, Serializable {
     sourceHandlerMap.clear();
   }
 
-  synchronized SourceHandler getSourceHandler(String sourceType, String stellarResult) throws IOException {
+  synchronized SourceHandler getSourceHandler(String sourceType, String stellarResult, WriterConfiguration config) throws IOException {
     SourceHandlerKey key = new SourceHandlerKey(sourceType, stellarResult);
     SourceHandler ret = sourceHandlerMap.get(key);
     if(ret == null) {
@@ -151,7 +161,7 @@ public class HdfsWriter implements BulkMessageWriter<JSONObject>, Serializable {
       }
       ret = new SourceHandler(rotationActions,
                               rotationPolicy,
-                              syncPolicy,
+                              syncPolicyCreator.create(sourceType, config),
                               new PathExtensionFileNameFormat(key.getStellarResult(), fileNameFormat),
                               new SourceHandlerCallback(sourceHandlerMap, key));
       sourceHandlerMap.put(key, ret);

http://git-wip-us.apache.org/repos/asf/metron/blob/be030765/metron-platform/metron-writer/src/main/java/org/apache/metron/writer/hdfs/SourceHandler.java
----------------------------------------------------------------------
diff --git a/metron-platform/metron-writer/src/main/java/org/apache/metron/writer/hdfs/SourceHandler.java b/metron-platform/metron-writer/src/main/java/org/apache/metron/writer/hdfs/SourceHandler.java
index fa6d8da..d895465 100644
--- a/metron-platform/metron-writer/src/main/java/org/apache/metron/writer/hdfs/SourceHandler.java
+++ b/metron-platform/metron-writer/src/main/java/org/apache/metron/writer/hdfs/SourceHandler.java
@@ -24,9 +24,11 @@ import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.hdfs.client.HdfsDataOutputStream;
 import org.apache.log4j.Logger;
+import org.apache.metron.common.configuration.writer.WriterConfiguration;
 import org.apache.storm.hdfs.bolt.format.FileNameFormat;
 import org.apache.storm.hdfs.bolt.rotation.FileRotationPolicy;
 import org.apache.storm.hdfs.bolt.rotation.TimedRotationPolicy;
+import org.apache.storm.hdfs.bolt.sync.CountSyncPolicy;
 import org.apache.storm.hdfs.bolt.sync.SyncPolicy;
 import org.apache.storm.hdfs.common.rotation.RotationAction;
 import org.json.simple.JSONObject;
@@ -34,6 +36,7 @@ import org.json.simple.JSONObject;
 import java.io.FileOutputStream;
 import java.io.IOException;
 import java.util.*;
+import java.util.function.Function;
 
 public class SourceHandler {
   private static final Logger LOG = Logger.getLogger(SourceHandler.class);
@@ -61,13 +64,8 @@ public class SourceHandler {
     initialize();
   }
 
-  public void handle(List<JSONObject> messages) throws Exception{
-    for(JSONObject message : messages) {
-      handle(message);
-    }
-  }
 
-  protected void handle(JSONObject message) throws IOException {
+  protected void handle(JSONObject message, String sensor, WriterConfiguration config, SyncPolicyCreator syncPolicyCreator) throws IOException {
     byte[] bytes = (message.toJSONString() + "\n").getBytes();
     synchronized (this.writeLock) {
       out.write(bytes);
@@ -79,7 +77,9 @@ public class SourceHandler {
         } else {
           this.out.hsync();
         }
-        this.syncPolicy.reset();
+        //recreate the sync policy for the next batch just in case something changed in the config
+        //and the sync policy depends on the config.
+        this.syncPolicy = syncPolicyCreator.create(sensor, config);
       }
     }
 

http://git-wip-us.apache.org/repos/asf/metron/blob/be030765/metron-platform/metron-writer/src/main/java/org/apache/metron/writer/hdfs/SyncPolicyCreator.java
----------------------------------------------------------------------
diff --git a/metron-platform/metron-writer/src/main/java/org/apache/metron/writer/hdfs/SyncPolicyCreator.java b/metron-platform/metron-writer/src/main/java/org/apache/metron/writer/hdfs/SyncPolicyCreator.java
new file mode 100644
index 0000000..9b7d205
--- /dev/null
+++ b/metron-platform/metron-writer/src/main/java/org/apache/metron/writer/hdfs/SyncPolicyCreator.java
@@ -0,0 +1,25 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.metron.writer.hdfs;
+
+import org.apache.metron.common.configuration.writer.WriterConfiguration;
+import org.apache.storm.hdfs.bolt.sync.SyncPolicy;
+
+public interface SyncPolicyCreator {
+  SyncPolicy create(String sensor, WriterConfiguration config);
+}

http://git-wip-us.apache.org/repos/asf/metron/blob/be030765/metron-platform/metron-writer/src/test/java/org/apache/metron/writer/hdfs/HdfsWriterTest.java
----------------------------------------------------------------------
diff --git a/metron-platform/metron-writer/src/test/java/org/apache/metron/writer/hdfs/HdfsWriterTest.java b/metron-platform/metron-writer/src/test/java/org/apache/metron/writer/hdfs/HdfsWriterTest.java
index 6153ed2..0118a15 100644
--- a/metron-platform/metron-writer/src/test/java/org/apache/metron/writer/hdfs/HdfsWriterTest.java
+++ b/metron-platform/metron-writer/src/test/java/org/apache/metron/writer/hdfs/HdfsWriterTest.java
@@ -209,7 +209,7 @@ public class HdfsWriterTest {
     writer.init(new HashMap<String, String>(), createTopologyContext(),  config);
 
     for(int i = 0; i < maxFiles; i++) {
-      writer.getSourceHandler(SENSOR_NAME, Integer.toString(i));
+      writer.getSourceHandler(SENSOR_NAME, Integer.toString(i), null);
     }
   }
 
@@ -223,7 +223,7 @@ public class HdfsWriterTest {
     writer.init(new HashMap<String, String>(), createTopologyContext(),  config);
 
     for(int i = 0; i < maxFiles+1; i++) {
-      writer.getSourceHandler(SENSOR_NAME, Integer.toString(i));
+      writer.getSourceHandler(SENSOR_NAME, Integer.toString(i), null);
     }
   }