You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@metron.apache.org by ni...@apache.org on 2019/10/29 15:53:49 UTC

[metron] 02/04: METRON-2301 Building Against Wrong Storm Flux Version (nickwallen) closes apache/metron#1544

This is an automated email from the ASF dual-hosted git repository.

nickallen pushed a commit to branch METRON-2223
in repository https://gitbox.apache.org/repos/asf/metron.git

commit c033bc731a3767d4a61dd72d1d3709d8cf3138f8
Author: nickwallen <ni...@apache.org>
AuthorDate: Tue Oct 29 10:56:25 2019 -0400

    METRON-2301 Building Against Wrong Storm Flux Version (nickwallen) closes apache/metron#1544
---
 .../metron-common-storm/pom.xml                    |  2 +-
 .../metron-elasticsearch-storm/pom.xml             |  2 +-
 metron-platform/metron-integration-test/pom.xml    |  2 +-
 .../components/FluxTopologyComponent.java          | 69 +++++++++++-----------
 metron-platform/metron-pcap-backend/pom.xml        |  2 +-
 .../metron-solr/metron-solr-storm/pom.xml          |  2 +-
 pom.xml                                            |  4 +-
 7 files changed, 41 insertions(+), 42 deletions(-)

diff --git a/metron-platform/metron-common-streaming/metron-common-storm/pom.xml b/metron-platform/metron-common-streaming/metron-common-storm/pom.xml
index 2b9fbed..4426802 100644
--- a/metron-platform/metron-common-streaming/metron-common-storm/pom.xml
+++ b/metron-platform/metron-common-streaming/metron-common-storm/pom.xml
@@ -72,7 +72,7 @@
       <dependency>
         <groupId>org.apache.storm</groupId>
         <artifactId>flux-core</artifactId>
-        <version>${global_flux_version}</version>
+        <version>${global_storm_version}</version>
       </dependency>
       <dependency>
           <groupId>org.apache.zookeeper</groupId>
diff --git a/metron-platform/metron-elasticsearch/metron-elasticsearch-storm/pom.xml b/metron-platform/metron-elasticsearch/metron-elasticsearch-storm/pom.xml
index a84e45d..3e71aba 100644
--- a/metron-platform/metron-elasticsearch/metron-elasticsearch-storm/pom.xml
+++ b/metron-platform/metron-elasticsearch/metron-elasticsearch-storm/pom.xml
@@ -62,7 +62,7 @@
         <dependency>
             <groupId>org.apache.storm</groupId>
             <artifactId>flux-core</artifactId>
-            <version>${global_flux_version}</version>
+            <version>${global_storm_version}</version>
         </dependency>
         <dependency>
             <groupId>org.apache.storm</groupId>
diff --git a/metron-platform/metron-integration-test/pom.xml b/metron-platform/metron-integration-test/pom.xml
index 6bb8291..3906a8f 100644
--- a/metron-platform/metron-integration-test/pom.xml
+++ b/metron-platform/metron-integration-test/pom.xml
@@ -45,7 +45,7 @@
     <dependency>
       <groupId>org.apache.storm</groupId>
       <artifactId>flux-core</artifactId>
-      <version>${global_flux_version}</version>
+      <version>${global_storm_version}</version>
     </dependency>
     <dependency>
       <groupId>org.apache.curator</groupId>
diff --git a/metron-platform/metron-integration-test/src/main/java/org/apache/metron/integration/components/FluxTopologyComponent.java b/metron-platform/metron-integration-test/src/main/java/org/apache/metron/integration/components/FluxTopologyComponent.java
index 1a1ceb0..85a05b0 100644
--- a/metron-platform/metron-integration-test/src/main/java/org/apache/metron/integration/components/FluxTopologyComponent.java
+++ b/metron-platform/metron-integration-test/src/main/java/org/apache/metron/integration/components/FluxTopologyComponent.java
@@ -17,22 +17,6 @@
  */
 package org.apache.metron.integration.components;
 
-import java.io.File;
-import java.io.FileOutputStream;
-import java.io.FileWriter;
-import java.io.IOException;
-import java.io.OutputStreamWriter;
-import java.io.Writer;
-import java.lang.invoke.MethodHandles;
-import java.lang.reflect.InvocationTargetException;
-import java.nio.charset.StandardCharsets;
-import java.nio.file.Files;
-import java.nio.file.Path;
-import java.nio.file.Paths;
-import java.util.Comparator;
-import java.util.Map;
-import java.util.Properties;
-import java.util.regex.Pattern;
 import org.apache.commons.io.FileUtils;
 import org.apache.curator.RetryPolicy;
 import org.apache.curator.framework.CuratorFramework;
@@ -55,6 +39,19 @@ import org.junit.Assert;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import java.io.File;
+import java.io.IOException;
+import java.io.StringReader;
+import java.lang.invoke.MethodHandles;
+import java.lang.reflect.InvocationTargetException;
+import java.nio.file.Files;
+import java.nio.file.Path;
+import java.nio.file.Paths;
+import java.util.Comparator;
+import java.util.Map;
+import java.util.Properties;
+import java.util.regex.Pattern;
+
 public class FluxTopologyComponent implements InMemoryComponent {
 
   protected static final Logger LOG = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
@@ -249,7 +246,7 @@ public class FluxTopologyComponent implements InMemoryComponent {
   }
 
   private void startTopology(String topologyName, File topologyLoc, File templateFile, Properties properties) throws IOException, ClassNotFoundException, NoSuchMethodException, InvocationTargetException, InstantiationException, IllegalAccessException, TException, NoSuchFieldException{
-    TopologyDef topologyDef = loadYaml(topologyName, topologyLoc, templateFile, properties);
+    TopologyDef topologyDef = loadYaml(topologyLoc, templateFile, properties);
     Config conf = FluxBuilder.buildConfig(topologyDef);
     ExecutionContext context = new ExecutionContext(topologyDef, conf);
     StormTopology topology = FluxBuilder.buildTopology(context);
@@ -267,26 +264,30 @@ public class FluxTopologyComponent implements InMemoryComponent {
     }
   }
 
-  private static TopologyDef loadYaml(String topologyName, File yamlFile, File templateFile, Properties properties) throws IOException {
-    File tmpFile = File.createTempFile(topologyName, "props");
-    tmpFile.deleteOnExit();
+  /**
+   * Creates a Storm topology.
+   * @param yamlFile The Flux file defining the topology.
+   * @param templateFile The template file used by the Mpack to create the topology's properties. For example, 'enrichment.properties.j2'.
+   * @param properties The topology properties.
+   * @return The Storm topology.
+   * @throws IOException
+   */
+  private static TopologyDef loadYaml(File yamlFile, File templateFile, Properties properties) throws IOException {
+    Properties topologyProperties;
     if (templateFile != null) {
-      try (Writer propWriter = new OutputStreamWriter(new FileOutputStream(tmpFile), StandardCharsets.UTF_8)){
-        String templateContents = FileUtils.readFileToString(templateFile);
-        for(Map.Entry prop: properties.entrySet()) {
-          String replacePattern = String.format("{{%s}}", prop.getKey());
-          templateContents = templateContents.replaceAll(Pattern.quote(replacePattern), (String) prop.getValue());
-        }
-        propWriter.write(templateContents);
-        propWriter.flush();
-        return FluxParser.parseFile(yamlFile.getAbsolutePath(), false, true, tmpFile.getAbsolutePath(), false);
+      // use the MPack template file (like 'enrichment.properties.j2') to generate the topology properties
+      String templateContents = FileUtils.readFileToString(templateFile);
+      for(Map.Entry prop: properties.entrySet()) {
+        String replacePattern = String.format("{{%s}}", prop.getKey());
+        templateContents = templateContents.replaceAll(Pattern.quote(replacePattern), (String) prop.getValue());
       }
+      topologyProperties = new Properties();
+      topologyProperties.load(new StringReader(templateContents));
+      return FluxParser.parseFile(yamlFile.getAbsolutePath(), false, true, topologyProperties, false);
+
     } else {
-      try (Writer propWriter = new OutputStreamWriter(new FileOutputStream(tmpFile), StandardCharsets.UTF_8)){
-        properties.store(propWriter, topologyName + " properties");
-        return FluxParser.parseFile(yamlFile.getAbsolutePath(), false, true, tmpFile.getAbsolutePath(), false);
-      }
+      // otherwise, just use the properties directly
+      return FluxParser.parseFile(yamlFile.getAbsolutePath(), false, true, properties, false);
     }
-
   }
 }
diff --git a/metron-platform/metron-pcap-backend/pom.xml b/metron-platform/metron-pcap-backend/pom.xml
index cf49ecf..e21a0b0 100644
--- a/metron-platform/metron-pcap-backend/pom.xml
+++ b/metron-platform/metron-pcap-backend/pom.xml
@@ -56,7 +56,7 @@
         <dependency>
             <groupId>org.apache.storm</groupId>
             <artifactId>flux-core</artifactId>
-            <version>${global_flux_version}</version>
+            <version>${global_storm_version}</version>
         </dependency>
         <dependency>
             <groupId>org.apache.metron</groupId>
diff --git a/metron-platform/metron-solr/metron-solr-storm/pom.xml b/metron-platform/metron-solr/metron-solr-storm/pom.xml
index ba46af3..889ff5f 100644
--- a/metron-platform/metron-solr/metron-solr-storm/pom.xml
+++ b/metron-platform/metron-solr/metron-solr-storm/pom.xml
@@ -114,7 +114,7 @@
         <dependency>
             <groupId>org.apache.storm</groupId>
             <artifactId>flux-core</artifactId>
-            <version>${global_flux_version}</version>
+            <version>${global_storm_version}</version>
         </dependency>
         <dependency>
             <groupId>org.apache.storm</groupId>
diff --git a/pom.xml b/pom.xml
index 93e9da7..90de889 100644
--- a/pom.xml
+++ b/pom.xml
@@ -87,8 +87,6 @@
         <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
         <project.reporting.outputEncoding>UTF-8</project.reporting.outputEncoding>
         <!-- base project versions -->
-        <base_storm_version>1.0.1</base_storm_version>
-        <base_flux_version>1.0.1</base_flux_version>
         <base_kafka_version>0.10.0</base_kafka_version>
         <base_hadoop_version>2.7.1</base_hadoop_version>
         <base_flume_version>1.5.2</base_flume_version>
@@ -102,7 +100,6 @@
         <global_hbase_guava_version>12.0</global_hbase_guava_version>
         <global_storm_version>1.0.3</global_storm_version>
         <global_storm_kafka_version>1.2.2</global_storm_kafka_version>
-        <global_flux_version>${base_flux_version}</global_flux_version>
         <global_pcap_version>1.7.1</global_pcap_version>
         <global_hadoop_version>${base_hadoop_version}</global_hadoop_version>
         <global_flume_version>${base_flume_version}</global_flume_version>
@@ -166,6 +163,7 @@
                 <global_hbase_version>1.1.1</global_hbase_version>
                 <global_hbase_guava_version>12.0</global_hbase_guava_version>
                 <global_storm_kafka_version>1.2.2</global_storm_kafka_version>
+                <base_storm_version>1.0.1</base_storm_version>
                 <global_storm_version>${base_storm_version}.${hdp_version}-${build_number}</global_storm_version>
                 <global_kafka_version>${base_kafka_version}.${hdp_version}-${build_number}</global_kafka_version>
                 <global_zeppelin_version>0.7.3</global_zeppelin_version>