You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@metron.apache.org by ce...@apache.org on 2016/09/07 22:52:21 UTC
incubator-metron git commit: METRON-408 Intermittent Failures of
Profile Integration Tests (nickwallen via cestella) closes
apache/incubator-metron#245
Repository: incubator-metron
Updated Branches:
refs/heads/master 7445e328d -> 6a9dd29a6
METRON-408 Intermittent Failures of Profile Integration Tests (nickwallen via cestella) closes apache/incubator-metron#245
Project: http://git-wip-us.apache.org/repos/asf/incubator-metron/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-metron/commit/6a9dd29a
Tree: http://git-wip-us.apache.org/repos/asf/incubator-metron/tree/6a9dd29a
Diff: http://git-wip-us.apache.org/repos/asf/incubator-metron/diff/6a9dd29a
Branch: refs/heads/master
Commit: 6a9dd29a63e5337a0536315437ca7d89145bdfe4
Parents: 7445e32
Author: nickwallen <ni...@nickallen.org>
Authored: Wed Sep 7 15:52:09 2016 -0700
Committer: cstella <ce...@gmail.com>
Committed: Wed Sep 7 15:52:09 2016 -0700
----------------------------------------------------------------------
.../integration/ConfigUploadComponent.java | 133 +++++++++----------
.../integration/ProfilerIntegrationTest.java | 4 +-
.../common/bolt/ConfiguredProfilerBolt.java | 18 ++-
.../configuration/ConfigurationsUtils.java | 10 +-
.../components/KafkaWithZKComponent.java | 1 -
5 files changed, 84 insertions(+), 82 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/6a9dd29a/metron-analytics/metron-profiler/src/test/java/org/apache/metron/profiler/integration/ConfigUploadComponent.java
----------------------------------------------------------------------
diff --git a/metron-analytics/metron-profiler/src/test/java/org/apache/metron/profiler/integration/ConfigUploadComponent.java b/metron-analytics/metron-profiler/src/test/java/org/apache/metron/profiler/integration/ConfigUploadComponent.java
index f096d51..b614b15 100644
--- a/metron-analytics/metron-profiler/src/test/java/org/apache/metron/profiler/integration/ConfigUploadComponent.java
+++ b/metron-analytics/metron-profiler/src/test/java/org/apache/metron/profiler/integration/ConfigUploadComponent.java
@@ -20,100 +20,93 @@
package org.apache.metron.profiler.integration;
import org.apache.curator.framework.CuratorFramework;
-import org.apache.metron.common.configuration.SensorParserConfig;
import org.apache.metron.integration.InMemoryComponent;
import org.apache.metron.integration.UnableToStartException;
import org.apache.metron.integration.components.KafkaWithZKComponent;
-import java.util.HashMap;
-import java.util.Map;
-import java.util.Optional;
import java.util.Properties;
-import static org.apache.metron.common.configuration.ConfigurationsUtils.*;
+import static org.apache.metron.common.configuration.ConfigurationsUtils.getClient;
+import static org.apache.metron.common.configuration.ConfigurationsUtils.readGlobalConfigFromFile;
+import static org.apache.metron.common.configuration.ConfigurationsUtils.writeGlobalConfigToZookeeper;
+import static org.apache.metron.common.configuration.ConfigurationsUtils.readProfilerConfigFromFile;
+import static org.apache.metron.common.configuration.ConfigurationsUtils.writeProfilerConfigToZookeeper;
+
+/**
+ * Uploads configuration to Zookeeper.
+ */
public class ConfigUploadComponent implements InMemoryComponent {
private Properties topologyProperties;
- private String globalConfigPath;
- private String parserConfigsPath;
- private String enrichmentConfigsPath;
- private String profilerConfigPath;
- private Optional<String> globalConfig = Optional.empty();
- private Map<String, SensorParserConfig> parserSensorConfigs = new HashMap<>();
-
- public ConfigUploadComponent withTopologyProperties(Properties topologyProperties) {
- this.topologyProperties = topologyProperties;
- return this;
- }
+ private String globalConfiguration;
+ private String profilerConfiguration;
- public ConfigUploadComponent withGlobalConfigsPath(String globalConfigPath) {
- this.globalConfigPath = globalConfigPath;
- return this;
- }
-
- public ConfigUploadComponent withParserConfigsPath(String parserConfigsPath) {
- this.parserConfigsPath = parserConfigsPath;
- return this;
- }
- public ConfigUploadComponent withEnrichmentConfigsPath(String enrichmentConfigsPath) {
- this.enrichmentConfigsPath = enrichmentConfigsPath;
- return this;
- }
-
- public ConfigUploadComponent withProfilerConfigsPath(String profilerConfigsPath) {
- this.profilerConfigPath = profilerConfigsPath;
- return this;
+ @Override
+ public void start() throws UnableToStartException {
+ try {
+ upload();
+ } catch (Exception e) {
+ throw new UnableToStartException(e.getMessage(), e);
+ }
}
- public ConfigUploadComponent withParserSensorConfig(String name, SensorParserConfig config) {
- parserSensorConfigs.put(name, config);
- return this;
+ @Override
+ public void stop() {
+ // nothing to do
}
- public ConfigUploadComponent withGlobalConfig(String globalConfig) {
- this.globalConfig = Optional.ofNullable(globalConfig);
- return this;
+ /**
+ * Uploads configuration to Zookeeper.
+ * @throws Exception
+ */
+ private void upload() throws Exception {
+ final String zookeeperUrl = topologyProperties.getProperty(KafkaWithZKComponent.ZOOKEEPER_PROPERTY);
+ try(CuratorFramework client = getClient(zookeeperUrl)) {
+ client.start();
+ uploadGlobalConfig(client);
+ uploadProfilerConfig(client);
+ }
}
- @Override
- public void start() throws UnableToStartException {
- try {
- final String zookeeperUrl = topologyProperties.getProperty(KafkaWithZKComponent.ZOOKEEPER_PROPERTY);
-
- if(globalConfigPath != null) {
- uploadConfigsToZookeeper(globalConfigPath, parserConfigsPath, enrichmentConfigsPath, profilerConfigPath, zookeeperUrl);
- }
-
- for(Map.Entry<String, SensorParserConfig> kv : parserSensorConfigs.entrySet()) {
- writeSensorParserConfigToZookeeper(kv.getKey(), kv.getValue(), zookeeperUrl);
+ /**
+ * Upload the profiler configuration to Zookeeper.
+ * @param client The zookeeper client.
+ */
+ private void uploadProfilerConfig(CuratorFramework client) throws Exception {
+ if (profilerConfiguration != null) {
+ byte[] globalConfig = readProfilerConfigFromFile(profilerConfiguration);
+ if (globalConfig.length > 0) {
+ writeProfilerConfigToZookeeper(readProfilerConfigFromFile(profilerConfiguration), client);
}
+ }
+ }
- if(globalConfig.isPresent()) {
- writeGlobalConfigToZookeeper(globalConfig.get().getBytes(), zookeeperUrl);
+ /**
+ * Upload the global configuration to Zookeeper.
+ * @param client The zookeeper client.
+ */
+ private void uploadGlobalConfig(CuratorFramework client) throws Exception {
+ if (globalConfiguration == null) {
+ byte[] globalConfig = readGlobalConfigFromFile(globalConfiguration);
+ if (globalConfig.length > 0) {
+ writeGlobalConfigToZookeeper(readGlobalConfigFromFile(globalConfiguration), client);
}
-
- } catch (Exception e) {
- throw new UnableToStartException(e.getMessage(), e);
}
}
- public SensorParserConfig getSensorParserConfig(String sensorType) {
- SensorParserConfig sensorParserConfig = new SensorParserConfig();
- CuratorFramework client = getClient(topologyProperties.getProperty(KafkaWithZKComponent.ZOOKEEPER_PROPERTY));
- client.start();
- try {
- sensorParserConfig = readSensorParserConfigFromZookeeper(sensorType, client);
- } catch (Exception e) {
- e.printStackTrace();
- } finally {
- client.close();
- }
- return sensorParserConfig;
+ public ConfigUploadComponent withTopologyProperties(Properties topologyProperties) {
+ this.topologyProperties = topologyProperties;
+ return this;
}
- @Override
- public void stop() {
+ public ConfigUploadComponent withGlobalConfiguration(String path) {
+ this.globalConfiguration = path;
+ return this;
+ }
+ public ConfigUploadComponent withProfilerConfiguration(String path) {
+ this.profilerConfiguration = path;
+ return this;
}
-}
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/6a9dd29a/metron-analytics/metron-profiler/src/test/java/org/apache/metron/profiler/integration/ProfilerIntegrationTest.java
----------------------------------------------------------------------
diff --git a/metron-analytics/metron-profiler/src/test/java/org/apache/metron/profiler/integration/ProfilerIntegrationTest.java b/metron-analytics/metron-profiler/src/test/java/org/apache/metron/profiler/integration/ProfilerIntegrationTest.java
index 9513305..66c1308 100644
--- a/metron-analytics/metron-profiler/src/test/java/org/apache/metron/profiler/integration/ProfilerIntegrationTest.java
+++ b/metron-analytics/metron-profiler/src/test/java/org/apache/metron/profiler/integration/ProfilerIntegrationTest.java
@@ -280,8 +280,8 @@ public class ProfilerIntegrationTest extends BaseIntegrationTest {
// upload profiler configuration to zookeeper
ConfigUploadComponent configUploadComponent = new ConfigUploadComponent()
.withTopologyProperties(topologyProperties)
- .withGlobalConfigsPath(pathToConfig)
- .withProfilerConfigsPath(pathToConfig);
+ .withGlobalConfiguration(pathToConfig)
+ .withProfilerConfiguration(pathToConfig);
// load flux definition for the profiler topology
fluxComponent = new FluxTopologyComponent.Builder()
http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/6a9dd29a/metron-platform/metron-common/src/main/java/org/apache/metron/common/bolt/ConfiguredProfilerBolt.java
----------------------------------------------------------------------
diff --git a/metron-platform/metron-common/src/main/java/org/apache/metron/common/bolt/ConfiguredProfilerBolt.java b/metron-platform/metron-common/src/main/java/org/apache/metron/common/bolt/ConfiguredProfilerBolt.java
index bc1e5d6..c934e6d 100644
--- a/metron-platform/metron-common/src/main/java/org/apache/metron/common/bolt/ConfiguredProfilerBolt.java
+++ b/metron-platform/metron-common/src/main/java/org/apache/metron/common/bolt/ConfiguredProfilerBolt.java
@@ -17,21 +17,24 @@
*/
package org.apache.metron.common.bolt;
+import org.apache.curator.framework.CuratorFramework;
import org.apache.log4j.Logger;
import org.apache.metron.common.configuration.ConfigurationType;
-import org.apache.metron.common.configuration.ConfigurationsUtils;
import org.apache.metron.common.configuration.profiler.ProfilerConfig;
import org.apache.metron.common.configuration.profiler.ProfilerConfigurations;
+import org.apache.metron.common.utils.JSONUtils;
+import java.io.ByteArrayInputStream;
import java.io.IOException;
+import static org.apache.metron.common.configuration.ConfigurationType.PROFILER;
+
/**
* A bolt used in the Profiler topology that is configured with values stored in Zookeeper.
*/
public abstract class ConfiguredProfilerBolt extends ConfiguredBolt<ProfilerConfigurations> {
private static final Logger LOG = Logger.getLogger(ConfiguredProfilerBolt.class);
- protected final ProfilerConfigurations configurations = new ProfilerConfigurations();
public ConfiguredProfilerBolt(String zookeeperUrl) {
super(zookeeperUrl);
@@ -49,12 +52,21 @@ public abstract class ConfiguredProfilerBolt extends ConfiguredBolt<ProfilerConf
@Override
public void loadConfig() {
try {
- ConfigurationsUtils.updateProfilerConfigsFromZookeeper(getConfigurations(), client);
+ ProfilerConfig config = readFromZookeeper(client);
+ if(config != null) {
+ getConfigurations().updateProfilerConfig(config);
+ }
+
} catch (Exception e) {
LOG.warn("Unable to load configs from zookeeper, but the cache should load lazily...");
}
}
+ private ProfilerConfig readFromZookeeper(CuratorFramework client) throws Exception {
+ byte[] raw = client.getData().forPath(PROFILER.getZookeeperRoot());
+ return JSONUtils.INSTANCE.load(new ByteArrayInputStream(raw), ProfilerConfig.class);
+ }
+
@Override
public void updateConfig(String path, byte[] data) throws IOException {
if (data.length != 0) {
http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/6a9dd29a/metron-platform/metron-common/src/main/java/org/apache/metron/common/configuration/ConfigurationsUtils.java
----------------------------------------------------------------------
diff --git a/metron-platform/metron-common/src/main/java/org/apache/metron/common/configuration/ConfigurationsUtils.java b/metron-platform/metron-common/src/main/java/org/apache/metron/common/configuration/ConfigurationsUtils.java
index 1411f9c..12a416a 100644
--- a/metron-platform/metron-common/src/main/java/org/apache/metron/common/configuration/ConfigurationsUtils.java
+++ b/metron-platform/metron-common/src/main/java/org/apache/metron/common/configuration/ConfigurationsUtils.java
@@ -24,7 +24,6 @@ import org.apache.curator.framework.CuratorFrameworkFactory;
import org.apache.curator.retry.ExponentialBackoffRetry;
import org.apache.metron.common.Constants;
import org.apache.metron.common.configuration.enrichment.SensorEnrichmentConfig;
-import org.apache.metron.common.configuration.profiler.ProfilerConfigurations;
import org.apache.metron.common.utils.JSONUtils;
import org.apache.zookeeper.KeeperException;
@@ -37,7 +36,10 @@ import java.util.HashMap;
import java.util.List;
import java.util.Map;
-import static org.apache.metron.common.configuration.ConfigurationType.*;
+import static org.apache.metron.common.configuration.ConfigurationType.ENRICHMENT;
+import static org.apache.metron.common.configuration.ConfigurationType.GLOBAL;
+import static org.apache.metron.common.configuration.ConfigurationType.PARSER;
+import static org.apache.metron.common.configuration.ConfigurationType.PROFILER;
public class ConfigurationsUtils {
@@ -158,10 +160,6 @@ public class ConfigurationsUtils {
}
}
- public static void updateProfilerConfigsFromZookeeper(ProfilerConfigurations configurations, CuratorFramework client) throws Exception {
- updateConfigsFromZookeeper(configurations, client);
- }
-
public static SensorEnrichmentConfig readSensorEnrichmentConfigFromZookeeper(String sensorType, CuratorFramework client) throws Exception {
return JSONUtils.INSTANCE.load(new ByteArrayInputStream(readFromZookeeper(ENRICHMENT.getZookeeperRoot() + "/" + sensorType, client)), SensorEnrichmentConfig.class);
}
http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/6a9dd29a/metron-platform/metron-integration-test/src/main/java/org/apache/metron/integration/components/KafkaWithZKComponent.java
----------------------------------------------------------------------
diff --git a/metron-platform/metron-integration-test/src/main/java/org/apache/metron/integration/components/KafkaWithZKComponent.java b/metron-platform/metron-integration-test/src/main/java/org/apache/metron/integration/components/KafkaWithZKComponent.java
index ffe7b54..8ffc5e7 100644
--- a/metron-platform/metron-integration-test/src/main/java/org/apache/metron/integration/components/KafkaWithZKComponent.java
+++ b/metron-platform/metron-integration-test/src/main/java/org/apache/metron/integration/components/KafkaWithZKComponent.java
@@ -166,7 +166,6 @@ public class KafkaWithZKComponent implements InMemoryComponent {
if(zkServer != null) {
zkServer.shutdown();
}
-
}
public List<byte[]> readMessages(String topic) {