You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@metron.apache.org by ce...@apache.org on 2016/09/07 22:52:21 UTC

incubator-metron git commit: METRON-408 Intermittent Failures of Profile Integration Tests (nickwallen via cestella) closes apache/incubator-metron#245

Repository: incubator-metron
Updated Branches:
  refs/heads/master 7445e328d -> 6a9dd29a6


METRON-408 Intermittent Failures of Profile Integration Tests (nickwallen via cestella) closes apache/incubator-metron#245


Project: http://git-wip-us.apache.org/repos/asf/incubator-metron/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-metron/commit/6a9dd29a
Tree: http://git-wip-us.apache.org/repos/asf/incubator-metron/tree/6a9dd29a
Diff: http://git-wip-us.apache.org/repos/asf/incubator-metron/diff/6a9dd29a

Branch: refs/heads/master
Commit: 6a9dd29a63e5337a0536315437ca7d89145bdfe4
Parents: 7445e32
Author: nickwallen <ni...@nickallen.org>
Authored: Wed Sep 7 15:52:09 2016 -0700
Committer: cstella <ce...@gmail.com>
Committed: Wed Sep 7 15:52:09 2016 -0700

----------------------------------------------------------------------
 .../integration/ConfigUploadComponent.java      | 133 +++++++++----------
 .../integration/ProfilerIntegrationTest.java    |   4 +-
 .../common/bolt/ConfiguredProfilerBolt.java     |  18 ++-
 .../configuration/ConfigurationsUtils.java      |  10 +-
 .../components/KafkaWithZKComponent.java        |   1 -
 5 files changed, 84 insertions(+), 82 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/6a9dd29a/metron-analytics/metron-profiler/src/test/java/org/apache/metron/profiler/integration/ConfigUploadComponent.java
----------------------------------------------------------------------
diff --git a/metron-analytics/metron-profiler/src/test/java/org/apache/metron/profiler/integration/ConfigUploadComponent.java b/metron-analytics/metron-profiler/src/test/java/org/apache/metron/profiler/integration/ConfigUploadComponent.java
index f096d51..b614b15 100644
--- a/metron-analytics/metron-profiler/src/test/java/org/apache/metron/profiler/integration/ConfigUploadComponent.java
+++ b/metron-analytics/metron-profiler/src/test/java/org/apache/metron/profiler/integration/ConfigUploadComponent.java
@@ -20,100 +20,93 @@
 package org.apache.metron.profiler.integration;
 
 import org.apache.curator.framework.CuratorFramework;
-import org.apache.metron.common.configuration.SensorParserConfig;
 import org.apache.metron.integration.InMemoryComponent;
 import org.apache.metron.integration.UnableToStartException;
 import org.apache.metron.integration.components.KafkaWithZKComponent;
 
-import java.util.HashMap;
-import java.util.Map;
-import java.util.Optional;
 import java.util.Properties;
 
-import static org.apache.metron.common.configuration.ConfigurationsUtils.*;
+import static org.apache.metron.common.configuration.ConfigurationsUtils.getClient;
+import static org.apache.metron.common.configuration.ConfigurationsUtils.readGlobalConfigFromFile;
+import static org.apache.metron.common.configuration.ConfigurationsUtils.writeGlobalConfigToZookeeper;
+import static org.apache.metron.common.configuration.ConfigurationsUtils.readProfilerConfigFromFile;
+import static org.apache.metron.common.configuration.ConfigurationsUtils.writeProfilerConfigToZookeeper;
 
+
+/**
+ * Uploads configuration to Zookeeper.
+ */
 public class ConfigUploadComponent implements InMemoryComponent {
 
   private Properties topologyProperties;
-  private String globalConfigPath;
-  private String parserConfigsPath;
-  private String enrichmentConfigsPath;
-  private String profilerConfigPath;
-  private Optional<String> globalConfig = Optional.empty();
-  private Map<String, SensorParserConfig> parserSensorConfigs = new HashMap<>();
-
-  public ConfigUploadComponent withTopologyProperties(Properties topologyProperties) {
-    this.topologyProperties = topologyProperties;
-    return this;
-  }
+  private String globalConfiguration;
+  private String profilerConfiguration;
 
-  public ConfigUploadComponent withGlobalConfigsPath(String globalConfigPath) {
-    this.globalConfigPath = globalConfigPath;
-    return this;
-  }
-
-  public ConfigUploadComponent withParserConfigsPath(String parserConfigsPath) {
-    this.parserConfigsPath = parserConfigsPath;
-    return this;
-  }
-  public ConfigUploadComponent withEnrichmentConfigsPath(String enrichmentConfigsPath) {
-    this.enrichmentConfigsPath = enrichmentConfigsPath;
-    return this;
-  }
-
-  public ConfigUploadComponent withProfilerConfigsPath(String profilerConfigsPath) {
-    this.profilerConfigPath = profilerConfigsPath;
-    return this;
+  @Override
+  public void start() throws UnableToStartException {
+    try {
+      upload();
+    } catch (Exception e) {
+      throw new UnableToStartException(e.getMessage(), e);
+    }
   }
 
-  public ConfigUploadComponent withParserSensorConfig(String name, SensorParserConfig config) {
-    parserSensorConfigs.put(name, config);
-    return this;
+  @Override
+  public void stop() {
+    // nothing to do
   }
 
-  public ConfigUploadComponent withGlobalConfig(String globalConfig) {
-    this.globalConfig = Optional.ofNullable(globalConfig);
-    return this;
+  /**
+   * Uploads configuration to Zookeeper.
+   * @throws Exception
+   */
+  private void upload() throws Exception {
+    final String zookeeperUrl = topologyProperties.getProperty(KafkaWithZKComponent.ZOOKEEPER_PROPERTY);
+    try(CuratorFramework client = getClient(zookeeperUrl)) {
+      client.start();
+      uploadGlobalConfig(client);
+      uploadProfilerConfig(client);
+    }
   }
 
-  @Override
-  public void start() throws UnableToStartException {
-    try {
-      final String zookeeperUrl = topologyProperties.getProperty(KafkaWithZKComponent.ZOOKEEPER_PROPERTY);
-
-      if(globalConfigPath != null) {
-        uploadConfigsToZookeeper(globalConfigPath, parserConfigsPath, enrichmentConfigsPath, profilerConfigPath, zookeeperUrl);
-      }
-
-      for(Map.Entry<String, SensorParserConfig> kv : parserSensorConfigs.entrySet()) {
-        writeSensorParserConfigToZookeeper(kv.getKey(), kv.getValue(), zookeeperUrl);
+  /**
+   * Upload the profiler configuration to Zookeeper.
+   * @param client The zookeeper client.
+   */
+  private void uploadProfilerConfig(CuratorFramework client) throws Exception {
+    if (profilerConfiguration != null) {
+      byte[] globalConfig = readProfilerConfigFromFile(profilerConfiguration);
+      if (globalConfig.length > 0) {
+        writeProfilerConfigToZookeeper(readProfilerConfigFromFile(profilerConfiguration), client);
       }
+    }
+  }
 
-      if(globalConfig.isPresent()) {
-        writeGlobalConfigToZookeeper(globalConfig.get().getBytes(), zookeeperUrl);
+  /**
+   * Upload the global configuration to Zookeeper.
+   * @param client The zookeeper client.
+   */
+  private void uploadGlobalConfig(CuratorFramework client) throws Exception {
+    if (globalConfiguration == null) {
+      byte[] globalConfig = readGlobalConfigFromFile(globalConfiguration);
+      if (globalConfig.length > 0) {
+        writeGlobalConfigToZookeeper(readGlobalConfigFromFile(globalConfiguration), client);
       }
-
-    } catch (Exception e) {
-      throw new UnableToStartException(e.getMessage(), e);
     }
   }
 
-  public SensorParserConfig getSensorParserConfig(String sensorType) {
-    SensorParserConfig sensorParserConfig = new SensorParserConfig();
-    CuratorFramework client = getClient(topologyProperties.getProperty(KafkaWithZKComponent.ZOOKEEPER_PROPERTY));
-    client.start();
-    try {
-      sensorParserConfig = readSensorParserConfigFromZookeeper(sensorType, client);
-    } catch (Exception e) {
-      e.printStackTrace();
-    } finally {
-      client.close();
-    }
-    return sensorParserConfig;
+  public ConfigUploadComponent withTopologyProperties(Properties topologyProperties) {
+    this.topologyProperties = topologyProperties;
+    return this;
   }
 
-  @Override
-  public void stop() {
+  public ConfigUploadComponent withGlobalConfiguration(String path) {
+    this.globalConfiguration = path;
+    return this;
+  }
 
+  public ConfigUploadComponent withProfilerConfiguration(String path) {
+    this.profilerConfiguration = path;
+    return this;
   }
-}
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/6a9dd29a/metron-analytics/metron-profiler/src/test/java/org/apache/metron/profiler/integration/ProfilerIntegrationTest.java
----------------------------------------------------------------------
diff --git a/metron-analytics/metron-profiler/src/test/java/org/apache/metron/profiler/integration/ProfilerIntegrationTest.java b/metron-analytics/metron-profiler/src/test/java/org/apache/metron/profiler/integration/ProfilerIntegrationTest.java
index 9513305..66c1308 100644
--- a/metron-analytics/metron-profiler/src/test/java/org/apache/metron/profiler/integration/ProfilerIntegrationTest.java
+++ b/metron-analytics/metron-profiler/src/test/java/org/apache/metron/profiler/integration/ProfilerIntegrationTest.java
@@ -280,8 +280,8 @@ public class ProfilerIntegrationTest extends BaseIntegrationTest {
     // upload profiler configuration to zookeeper
     ConfigUploadComponent configUploadComponent = new ConfigUploadComponent()
             .withTopologyProperties(topologyProperties)
-            .withGlobalConfigsPath(pathToConfig)
-            .withProfilerConfigsPath(pathToConfig);
+            .withGlobalConfiguration(pathToConfig)
+            .withProfilerConfiguration(pathToConfig);
 
     // load flux definition for the profiler topology
     fluxComponent = new FluxTopologyComponent.Builder()

http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/6a9dd29a/metron-platform/metron-common/src/main/java/org/apache/metron/common/bolt/ConfiguredProfilerBolt.java
----------------------------------------------------------------------
diff --git a/metron-platform/metron-common/src/main/java/org/apache/metron/common/bolt/ConfiguredProfilerBolt.java b/metron-platform/metron-common/src/main/java/org/apache/metron/common/bolt/ConfiguredProfilerBolt.java
index bc1e5d6..c934e6d 100644
--- a/metron-platform/metron-common/src/main/java/org/apache/metron/common/bolt/ConfiguredProfilerBolt.java
+++ b/metron-platform/metron-common/src/main/java/org/apache/metron/common/bolt/ConfiguredProfilerBolt.java
@@ -17,21 +17,24 @@
  */
 package org.apache.metron.common.bolt;
 
+import org.apache.curator.framework.CuratorFramework;
 import org.apache.log4j.Logger;
 import org.apache.metron.common.configuration.ConfigurationType;
-import org.apache.metron.common.configuration.ConfigurationsUtils;
 import org.apache.metron.common.configuration.profiler.ProfilerConfig;
 import org.apache.metron.common.configuration.profiler.ProfilerConfigurations;
+import org.apache.metron.common.utils.JSONUtils;
 
+import java.io.ByteArrayInputStream;
 import java.io.IOException;
 
+import static org.apache.metron.common.configuration.ConfigurationType.PROFILER;
+
 /**
  * A bolt used in the Profiler topology that is configured with values stored in Zookeeper.
  */
 public abstract class ConfiguredProfilerBolt extends ConfiguredBolt<ProfilerConfigurations> {
 
   private static final Logger LOG = Logger.getLogger(ConfiguredProfilerBolt.class);
-  protected final ProfilerConfigurations configurations = new ProfilerConfigurations();
 
   public ConfiguredProfilerBolt(String zookeeperUrl) {
     super(zookeeperUrl);
@@ -49,12 +52,21 @@ public abstract class ConfiguredProfilerBolt extends ConfiguredBolt<ProfilerConf
   @Override
   public void loadConfig() {
     try {
-      ConfigurationsUtils.updateProfilerConfigsFromZookeeper(getConfigurations(), client);
+      ProfilerConfig config = readFromZookeeper(client);
+      if(config != null) {
+        getConfigurations().updateProfilerConfig(config);
+      }
+
     } catch (Exception e) {
       LOG.warn("Unable to load configs from zookeeper, but the cache should load lazily...");
     }
   }
 
+  private ProfilerConfig readFromZookeeper(CuratorFramework client) throws Exception {
+    byte[] raw = client.getData().forPath(PROFILER.getZookeeperRoot());
+    return JSONUtils.INSTANCE.load(new ByteArrayInputStream(raw), ProfilerConfig.class);
+  }
+
   @Override
   public void updateConfig(String path, byte[] data) throws IOException {
     if (data.length != 0) {

http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/6a9dd29a/metron-platform/metron-common/src/main/java/org/apache/metron/common/configuration/ConfigurationsUtils.java
----------------------------------------------------------------------
diff --git a/metron-platform/metron-common/src/main/java/org/apache/metron/common/configuration/ConfigurationsUtils.java b/metron-platform/metron-common/src/main/java/org/apache/metron/common/configuration/ConfigurationsUtils.java
index 1411f9c..12a416a 100644
--- a/metron-platform/metron-common/src/main/java/org/apache/metron/common/configuration/ConfigurationsUtils.java
+++ b/metron-platform/metron-common/src/main/java/org/apache/metron/common/configuration/ConfigurationsUtils.java
@@ -24,7 +24,6 @@ import org.apache.curator.framework.CuratorFrameworkFactory;
 import org.apache.curator.retry.ExponentialBackoffRetry;
 import org.apache.metron.common.Constants;
 import org.apache.metron.common.configuration.enrichment.SensorEnrichmentConfig;
-import org.apache.metron.common.configuration.profiler.ProfilerConfigurations;
 import org.apache.metron.common.utils.JSONUtils;
 import org.apache.zookeeper.KeeperException;
 
@@ -37,7 +36,10 @@ import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 
-import static org.apache.metron.common.configuration.ConfigurationType.*;
+import static org.apache.metron.common.configuration.ConfigurationType.ENRICHMENT;
+import static org.apache.metron.common.configuration.ConfigurationType.GLOBAL;
+import static org.apache.metron.common.configuration.ConfigurationType.PARSER;
+import static org.apache.metron.common.configuration.ConfigurationType.PROFILER;
 
 public class ConfigurationsUtils {
 
@@ -158,10 +160,6 @@ public class ConfigurationsUtils {
     }
   }
 
-  public static void updateProfilerConfigsFromZookeeper(ProfilerConfigurations configurations, CuratorFramework client) throws Exception {
-    updateConfigsFromZookeeper(configurations, client);
-  }
-
   public static SensorEnrichmentConfig readSensorEnrichmentConfigFromZookeeper(String sensorType, CuratorFramework client) throws Exception {
     return JSONUtils.INSTANCE.load(new ByteArrayInputStream(readFromZookeeper(ENRICHMENT.getZookeeperRoot() + "/" + sensorType, client)), SensorEnrichmentConfig.class);
   }

http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/6a9dd29a/metron-platform/metron-integration-test/src/main/java/org/apache/metron/integration/components/KafkaWithZKComponent.java
----------------------------------------------------------------------
diff --git a/metron-platform/metron-integration-test/src/main/java/org/apache/metron/integration/components/KafkaWithZKComponent.java b/metron-platform/metron-integration-test/src/main/java/org/apache/metron/integration/components/KafkaWithZKComponent.java
index ffe7b54..8ffc5e7 100644
--- a/metron-platform/metron-integration-test/src/main/java/org/apache/metron/integration/components/KafkaWithZKComponent.java
+++ b/metron-platform/metron-integration-test/src/main/java/org/apache/metron/integration/components/KafkaWithZKComponent.java
@@ -166,7 +166,6 @@ public class KafkaWithZKComponent implements InMemoryComponent {
     if(zkServer != null) {
       zkServer.shutdown();
     }
-
   }
 
   public List<byte[]> readMessages(String topic) {