You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@metron.apache.org by ni...@apache.org on 2019/10/30 12:24:22 UTC
[metron] branch feature/METRON-2088-support-hdp-3.1 updated:
METRON-2301 Building Against Wrong Storm Flux Version (nickwallen) closes
apache/metron#1544
This is an automated email from the ASF dual-hosted git repository.
nickallen pushed a commit to branch feature/METRON-2088-support-hdp-3.1
in repository https://gitbox.apache.org/repos/asf/metron.git
The following commit(s) were added to refs/heads/feature/METRON-2088-support-hdp-3.1 by this push:
new 01f6107 METRON-2301 Building Against Wrong Storm Flux Version (nickwallen) closes apache/metron#1544
01f6107 is described below
commit 01f6107438a615612c378b708677be778ed4dad8
Author: nickwallen <ni...@apache.org>
AuthorDate: Wed Oct 30 08:23:52 2019 -0400
METRON-2301 Building Against Wrong Storm Flux Version (nickwallen) closes apache/metron#1544
---
.../metron-common-storm/pom.xml | 2 +-
.../metron-elasticsearch-storm/pom.xml | 2 +-
metron-platform/metron-integration-test/pom.xml | 2 +-
.../components/FluxTopologyComponent.java | 69 +++++++++++-----------
metron-platform/metron-pcap-backend/pom.xml | 2 +-
.../metron-solr/metron-solr-storm/pom.xml | 2 +-
pom.xml | 4 +-
7 files changed, 41 insertions(+), 42 deletions(-)
diff --git a/metron-platform/metron-common-streaming/metron-common-storm/pom.xml b/metron-platform/metron-common-streaming/metron-common-storm/pom.xml
index 2b9fbed..4426802 100644
--- a/metron-platform/metron-common-streaming/metron-common-storm/pom.xml
+++ b/metron-platform/metron-common-streaming/metron-common-storm/pom.xml
@@ -72,7 +72,7 @@
<dependency>
<groupId>org.apache.storm</groupId>
<artifactId>flux-core</artifactId>
- <version>${global_flux_version}</version>
+ <version>${global_storm_version}</version>
</dependency>
<dependency>
<groupId>org.apache.zookeeper</groupId>
diff --git a/metron-platform/metron-elasticsearch/metron-elasticsearch-storm/pom.xml b/metron-platform/metron-elasticsearch/metron-elasticsearch-storm/pom.xml
index a84e45d..3e71aba 100644
--- a/metron-platform/metron-elasticsearch/metron-elasticsearch-storm/pom.xml
+++ b/metron-platform/metron-elasticsearch/metron-elasticsearch-storm/pom.xml
@@ -62,7 +62,7 @@
<dependency>
<groupId>org.apache.storm</groupId>
<artifactId>flux-core</artifactId>
- <version>${global_flux_version}</version>
+ <version>${global_storm_version}</version>
</dependency>
<dependency>
<groupId>org.apache.storm</groupId>
diff --git a/metron-platform/metron-integration-test/pom.xml b/metron-platform/metron-integration-test/pom.xml
index 6bb8291..3906a8f 100644
--- a/metron-platform/metron-integration-test/pom.xml
+++ b/metron-platform/metron-integration-test/pom.xml
@@ -45,7 +45,7 @@
<dependency>
<groupId>org.apache.storm</groupId>
<artifactId>flux-core</artifactId>
- <version>${global_flux_version}</version>
+ <version>${global_storm_version}</version>
</dependency>
<dependency>
<groupId>org.apache.curator</groupId>
diff --git a/metron-platform/metron-integration-test/src/main/java/org/apache/metron/integration/components/FluxTopologyComponent.java b/metron-platform/metron-integration-test/src/main/java/org/apache/metron/integration/components/FluxTopologyComponent.java
index 1a1ceb0..85a05b0 100644
--- a/metron-platform/metron-integration-test/src/main/java/org/apache/metron/integration/components/FluxTopologyComponent.java
+++ b/metron-platform/metron-integration-test/src/main/java/org/apache/metron/integration/components/FluxTopologyComponent.java
@@ -17,22 +17,6 @@
*/
package org.apache.metron.integration.components;
-import java.io.File;
-import java.io.FileOutputStream;
-import java.io.FileWriter;
-import java.io.IOException;
-import java.io.OutputStreamWriter;
-import java.io.Writer;
-import java.lang.invoke.MethodHandles;
-import java.lang.reflect.InvocationTargetException;
-import java.nio.charset.StandardCharsets;
-import java.nio.file.Files;
-import java.nio.file.Path;
-import java.nio.file.Paths;
-import java.util.Comparator;
-import java.util.Map;
-import java.util.Properties;
-import java.util.regex.Pattern;
import org.apache.commons.io.FileUtils;
import org.apache.curator.RetryPolicy;
import org.apache.curator.framework.CuratorFramework;
@@ -55,6 +39,19 @@ import org.junit.Assert;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import java.io.File;
+import java.io.IOException;
+import java.io.StringReader;
+import java.lang.invoke.MethodHandles;
+import java.lang.reflect.InvocationTargetException;
+import java.nio.file.Files;
+import java.nio.file.Path;
+import java.nio.file.Paths;
+import java.util.Comparator;
+import java.util.Map;
+import java.util.Properties;
+import java.util.regex.Pattern;
+
public class FluxTopologyComponent implements InMemoryComponent {
protected static final Logger LOG = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
@@ -249,7 +246,7 @@ public class FluxTopologyComponent implements InMemoryComponent {
}
private void startTopology(String topologyName, File topologyLoc, File templateFile, Properties properties) throws IOException, ClassNotFoundException, NoSuchMethodException, InvocationTargetException, InstantiationException, IllegalAccessException, TException, NoSuchFieldException{
- TopologyDef topologyDef = loadYaml(topologyName, topologyLoc, templateFile, properties);
+ TopologyDef topologyDef = loadYaml(topologyLoc, templateFile, properties);
Config conf = FluxBuilder.buildConfig(topologyDef);
ExecutionContext context = new ExecutionContext(topologyDef, conf);
StormTopology topology = FluxBuilder.buildTopology(context);
@@ -267,26 +264,30 @@ public class FluxTopologyComponent implements InMemoryComponent {
}
}
- private static TopologyDef loadYaml(String topologyName, File yamlFile, File templateFile, Properties properties) throws IOException {
- File tmpFile = File.createTempFile(topologyName, "props");
- tmpFile.deleteOnExit();
+ /**
+ * Creates a Storm topology.
+ * @param yamlFile The Flux file defining the topology.
+ * @param templateFile The template file used by the Mpack to create the topology's properties. For example, 'enrichment.properties.j2'.
+ * @param properties The topology properties.
+ * @return The Storm topology.
+ * @throws IOException
+ */
+ private static TopologyDef loadYaml(File yamlFile, File templateFile, Properties properties) throws IOException {
+ Properties topologyProperties;
if (templateFile != null) {
- try (Writer propWriter = new OutputStreamWriter(new FileOutputStream(tmpFile), StandardCharsets.UTF_8)){
- String templateContents = FileUtils.readFileToString(templateFile);
- for(Map.Entry prop: properties.entrySet()) {
- String replacePattern = String.format("{{%s}}", prop.getKey());
- templateContents = templateContents.replaceAll(Pattern.quote(replacePattern), (String) prop.getValue());
- }
- propWriter.write(templateContents);
- propWriter.flush();
- return FluxParser.parseFile(yamlFile.getAbsolutePath(), false, true, tmpFile.getAbsolutePath(), false);
+ // use the MPack template file (like 'enrichment.properties.j2') to generate the topology properties
+ String templateContents = FileUtils.readFileToString(templateFile);
+ for(Map.Entry prop: properties.entrySet()) {
+ String replacePattern = String.format("{{%s}}", prop.getKey());
+ templateContents = templateContents.replaceAll(Pattern.quote(replacePattern), (String) prop.getValue());
}
+ topologyProperties = new Properties();
+ topologyProperties.load(new StringReader(templateContents));
+ return FluxParser.parseFile(yamlFile.getAbsolutePath(), false, true, topologyProperties, false);
+
} else {
- try (Writer propWriter = new OutputStreamWriter(new FileOutputStream(tmpFile), StandardCharsets.UTF_8)){
- properties.store(propWriter, topologyName + " properties");
- return FluxParser.parseFile(yamlFile.getAbsolutePath(), false, true, tmpFile.getAbsolutePath(), false);
- }
+ // otherwise, just use the properties directly
+ return FluxParser.parseFile(yamlFile.getAbsolutePath(), false, true, properties, false);
}
-
}
}
diff --git a/metron-platform/metron-pcap-backend/pom.xml b/metron-platform/metron-pcap-backend/pom.xml
index cf49ecf..e21a0b0 100644
--- a/metron-platform/metron-pcap-backend/pom.xml
+++ b/metron-platform/metron-pcap-backend/pom.xml
@@ -56,7 +56,7 @@
<dependency>
<groupId>org.apache.storm</groupId>
<artifactId>flux-core</artifactId>
- <version>${global_flux_version}</version>
+ <version>${global_storm_version}</version>
</dependency>
<dependency>
<groupId>org.apache.metron</groupId>
diff --git a/metron-platform/metron-solr/metron-solr-storm/pom.xml b/metron-platform/metron-solr/metron-solr-storm/pom.xml
index ba46af3..889ff5f 100644
--- a/metron-platform/metron-solr/metron-solr-storm/pom.xml
+++ b/metron-platform/metron-solr/metron-solr-storm/pom.xml
@@ -114,7 +114,7 @@
<dependency>
<groupId>org.apache.storm</groupId>
<artifactId>flux-core</artifactId>
- <version>${global_flux_version}</version>
+ <version>${global_storm_version}</version>
</dependency>
<dependency>
<groupId>org.apache.storm</groupId>
diff --git a/pom.xml b/pom.xml
index 9437439..ec5dd4f 100644
--- a/pom.xml
+++ b/pom.xml
@@ -87,8 +87,6 @@
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
<project.reporting.outputEncoding>UTF-8</project.reporting.outputEncoding>
<!-- base project versions -->
- <base_storm_version>1.0.1</base_storm_version>
- <base_flux_version>1.0.1</base_flux_version>
<base_kafka_version>0.10.0</base_kafka_version>
<base_hadoop_version>2.7.1</base_hadoop_version>
<base_flume_version>1.5.2</base_flume_version>
@@ -102,7 +100,6 @@
<global_hbase_guava_version>12.0</global_hbase_guava_version>
<global_storm_version>1.0.3</global_storm_version>
<global_storm_kafka_version>1.2.2</global_storm_kafka_version>
- <global_flux_version>${base_flux_version}</global_flux_version>
<global_pcap_version>1.7.1</global_pcap_version>
<global_hadoop_version>${base_hadoop_version}</global_hadoop_version>
<global_flume_version>${base_flume_version}</global_flume_version>
@@ -165,6 +162,7 @@
<global_hbase_version>1.1.1</global_hbase_version>
<global_hbase_guava_version>12.0</global_hbase_guava_version>
<global_storm_kafka_version>1.2.2</global_storm_kafka_version>
+ <base_storm_version>1.0.1</base_storm_version>
<global_storm_version>${base_storm_version}.${hdp_version}-${build_number}</global_storm_version>
<global_kafka_version>${base_kafka_version}.${hdp_version}-${build_number}</global_kafka_version>
<global_zeppelin_version>0.7.3</global_zeppelin_version>