You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@metron.apache.org by ce...@apache.org on 2017/05/16 20:50:12 UTC
metron git commit: METRON-955: Make the default sync policy for HDFS
Writer be based on the batch size closes apache/incubator-metron#589
Repository: metron
Updated Branches:
refs/heads/master c1c212117 -> be0307659
METRON-955: Make the default sync policy for HDFS Writer be based on the batch size closes apache/incubator-metron#589
Project: http://git-wip-us.apache.org/repos/asf/metron/repo
Commit: http://git-wip-us.apache.org/repos/asf/metron/commit/be030765
Tree: http://git-wip-us.apache.org/repos/asf/metron/tree/be030765
Diff: http://git-wip-us.apache.org/repos/asf/metron/diff/be030765
Branch: refs/heads/master
Commit: be03076599544f2baedb1b3010ad50625b2f32ae
Parents: c1c2121
Author: cstella <ce...@gmail.com>
Authored: Tue May 16 16:50:04 2017 -0400
Committer: cstella <ce...@gmail.com>
Committed: Tue May 16 16:50:04 2017 -0400
----------------------------------------------------------------------
.../apache/metron/writer/hdfs/HdfsWriter.java | 22 ++++++++++++-----
.../metron/writer/hdfs/SourceHandler.java | 14 +++++------
.../metron/writer/hdfs/SyncPolicyCreator.java | 25 ++++++++++++++++++++
.../metron/writer/hdfs/HdfsWriterTest.java | 4 ++--
4 files changed, 50 insertions(+), 15 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/metron/blob/be030765/metron-platform/metron-writer/src/main/java/org/apache/metron/writer/hdfs/HdfsWriter.java
----------------------------------------------------------------------
diff --git a/metron-platform/metron-writer/src/main/java/org/apache/metron/writer/hdfs/HdfsWriter.java b/metron-platform/metron-writer/src/main/java/org/apache/metron/writer/hdfs/HdfsWriter.java
index a86dfbc..e0ab502 100644
--- a/metron-platform/metron-writer/src/main/java/org/apache/metron/writer/hdfs/HdfsWriter.java
+++ b/metron-platform/metron-writer/src/main/java/org/apache/metron/writer/hdfs/HdfsWriter.java
@@ -40,16 +40,18 @@ import org.json.simple.JSONObject;
import java.io.IOException;
import java.io.Serializable;
import java.util.*;
+import java.util.function.Function;
public class HdfsWriter implements BulkMessageWriter<JSONObject>, Serializable {
List<RotationAction> rotationActions = new ArrayList<>();
FileRotationPolicy rotationPolicy = new NoRotationPolicy();
- SyncPolicy syncPolicy = new CountSyncPolicy(1); //sync every time, duh.
+ SyncPolicy syncPolicy;
FileNameFormat fileNameFormat;
Map<SourceHandlerKey, SourceHandler> sourceHandlerMap = new HashMap<>();
int maxOpenFiles = 500;
transient StellarProcessor stellarProcessor;
transient Map stormConfig;
+ transient SyncPolicyCreator syncPolicyCreator;
public HdfsWriter withFileNameFormat(FileNameFormat fileNameFormat){
@@ -81,6 +83,14 @@ public class HdfsWriter implements BulkMessageWriter<JSONObject>, Serializable {
this.stormConfig = stormConfig;
this.stellarProcessor = new StellarProcessor();
this.fileNameFormat.prepare(stormConfig,topologyContext);
+ if(syncPolicy != null) {
+ //if the user has specified the sync policy, we don't want to override their wishes.
+ syncPolicyCreator = (source,config) -> syncPolicy;
+ }
+ else {
+ //if the user has not, then we want to have the sync policy depend on the batch size.
+ syncPolicyCreator = (source, config) -> new CountSyncPolicy(config == null?1:config.getBatchSize(source));
+ }
}
@@ -92,18 +102,18 @@ public class HdfsWriter implements BulkMessageWriter<JSONObject>, Serializable {
) throws Exception
{
BulkWriterResponse response = new BulkWriterResponse();
+
// Currently treating all the messages in a group for pass/failure.
try {
// Messages can all result in different HDFS paths, because of Stellar Expressions, so we'll need to iterate through
for(JSONObject message : messages) {
- Map<String, Object> val = configurations.getSensorConfig(sourceType);
String path = getHdfsPathExtension(
sourceType,
(String)configurations.getSensorConfig(sourceType).getOrDefault(IndexingConfigurations.OUTPUT_PATH_FUNCTION_CONF, ""),
message
);
- SourceHandler handler = getSourceHandler(sourceType, path);
- handler.handle(message);
+ SourceHandler handler = getSourceHandler(sourceType, path, configurations);
+ handler.handle(message, sourceType, configurations, syncPolicyCreator);
}
} catch (Exception e) {
response.addAllErrors(e, tuples);
@@ -142,7 +152,7 @@ public class HdfsWriter implements BulkMessageWriter<JSONObject>, Serializable {
sourceHandlerMap.clear();
}
- synchronized SourceHandler getSourceHandler(String sourceType, String stellarResult) throws IOException {
+ synchronized SourceHandler getSourceHandler(String sourceType, String stellarResult, WriterConfiguration config) throws IOException {
SourceHandlerKey key = new SourceHandlerKey(sourceType, stellarResult);
SourceHandler ret = sourceHandlerMap.get(key);
if(ret == null) {
@@ -151,7 +161,7 @@ public class HdfsWriter implements BulkMessageWriter<JSONObject>, Serializable {
}
ret = new SourceHandler(rotationActions,
rotationPolicy,
- syncPolicy,
+ syncPolicyCreator.create(sourceType, config),
new PathExtensionFileNameFormat(key.getStellarResult(), fileNameFormat),
new SourceHandlerCallback(sourceHandlerMap, key));
sourceHandlerMap.put(key, ret);
http://git-wip-us.apache.org/repos/asf/metron/blob/be030765/metron-platform/metron-writer/src/main/java/org/apache/metron/writer/hdfs/SourceHandler.java
----------------------------------------------------------------------
diff --git a/metron-platform/metron-writer/src/main/java/org/apache/metron/writer/hdfs/SourceHandler.java b/metron-platform/metron-writer/src/main/java/org/apache/metron/writer/hdfs/SourceHandler.java
index fa6d8da..d895465 100644
--- a/metron-platform/metron-writer/src/main/java/org/apache/metron/writer/hdfs/SourceHandler.java
+++ b/metron-platform/metron-writer/src/main/java/org/apache/metron/writer/hdfs/SourceHandler.java
@@ -24,9 +24,11 @@ import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hdfs.client.HdfsDataOutputStream;
import org.apache.log4j.Logger;
+import org.apache.metron.common.configuration.writer.WriterConfiguration;
import org.apache.storm.hdfs.bolt.format.FileNameFormat;
import org.apache.storm.hdfs.bolt.rotation.FileRotationPolicy;
import org.apache.storm.hdfs.bolt.rotation.TimedRotationPolicy;
+import org.apache.storm.hdfs.bolt.sync.CountSyncPolicy;
import org.apache.storm.hdfs.bolt.sync.SyncPolicy;
import org.apache.storm.hdfs.common.rotation.RotationAction;
import org.json.simple.JSONObject;
@@ -34,6 +36,7 @@ import org.json.simple.JSONObject;
import java.io.FileOutputStream;
import java.io.IOException;
import java.util.*;
+import java.util.function.Function;
public class SourceHandler {
private static final Logger LOG = Logger.getLogger(SourceHandler.class);
@@ -61,13 +64,8 @@ public class SourceHandler {
initialize();
}
- public void handle(List<JSONObject> messages) throws Exception{
- for(JSONObject message : messages) {
- handle(message);
- }
- }
- protected void handle(JSONObject message) throws IOException {
+ protected void handle(JSONObject message, String sensor, WriterConfiguration config, SyncPolicyCreator syncPolicyCreator) throws IOException {
byte[] bytes = (message.toJSONString() + "\n").getBytes();
synchronized (this.writeLock) {
out.write(bytes);
@@ -79,7 +77,9 @@ public class SourceHandler {
} else {
this.out.hsync();
}
- this.syncPolicy.reset();
+ //recreate the sync policy for the next batch just in case something changed in the config
+ //and the sync policy depends on the config.
+ this.syncPolicy = syncPolicyCreator.create(sensor, config);
}
}
http://git-wip-us.apache.org/repos/asf/metron/blob/be030765/metron-platform/metron-writer/src/main/java/org/apache/metron/writer/hdfs/SyncPolicyCreator.java
----------------------------------------------------------------------
diff --git a/metron-platform/metron-writer/src/main/java/org/apache/metron/writer/hdfs/SyncPolicyCreator.java b/metron-platform/metron-writer/src/main/java/org/apache/metron/writer/hdfs/SyncPolicyCreator.java
new file mode 100644
index 0000000..9b7d205
--- /dev/null
+++ b/metron-platform/metron-writer/src/main/java/org/apache/metron/writer/hdfs/SyncPolicyCreator.java
@@ -0,0 +1,25 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.metron.writer.hdfs;
+
+import org.apache.metron.common.configuration.writer.WriterConfiguration;
+import org.apache.storm.hdfs.bolt.sync.SyncPolicy;
+
+public interface SyncPolicyCreator {
+ SyncPolicy create(String sensor, WriterConfiguration config);
+}
http://git-wip-us.apache.org/repos/asf/metron/blob/be030765/metron-platform/metron-writer/src/test/java/org/apache/metron/writer/hdfs/HdfsWriterTest.java
----------------------------------------------------------------------
diff --git a/metron-platform/metron-writer/src/test/java/org/apache/metron/writer/hdfs/HdfsWriterTest.java b/metron-platform/metron-writer/src/test/java/org/apache/metron/writer/hdfs/HdfsWriterTest.java
index 6153ed2..0118a15 100644
--- a/metron-platform/metron-writer/src/test/java/org/apache/metron/writer/hdfs/HdfsWriterTest.java
+++ b/metron-platform/metron-writer/src/test/java/org/apache/metron/writer/hdfs/HdfsWriterTest.java
@@ -209,7 +209,7 @@ public class HdfsWriterTest {
writer.init(new HashMap<String, String>(), createTopologyContext(), config);
for(int i = 0; i < maxFiles; i++) {
- writer.getSourceHandler(SENSOR_NAME, Integer.toString(i));
+ writer.getSourceHandler(SENSOR_NAME, Integer.toString(i), null);
}
}
@@ -223,7 +223,7 @@ public class HdfsWriterTest {
writer.init(new HashMap<String, String>(), createTopologyContext(), config);
for(int i = 0; i < maxFiles+1; i++) {
- writer.getSourceHandler(SENSOR_NAME, Integer.toString(i));
+ writer.getSourceHandler(SENSOR_NAME, Integer.toString(i), null);
}
}