You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@metron.apache.org by ot...@apache.org on 2018/04/18 15:00:04 UTC
[34/52] [abbrv] metron git commit: METRON-1505 Intermittent Profiler
Integration Test Failure (nickwallen) closes apache/metron#977
METRON-1505 Intermittent Profiler Integration Test Failure (nickwallen) closes apache/metron#977
Project: http://git-wip-us.apache.org/repos/asf/metron/repo
Commit: http://git-wip-us.apache.org/repos/asf/metron/commit/46bc63db
Tree: http://git-wip-us.apache.org/repos/asf/metron/tree/46bc63db
Diff: http://git-wip-us.apache.org/repos/asf/metron/diff/46bc63db
Branch: refs/heads/feature/METRON-1211-extensions-parsers-gradual
Commit: 46bc63dbcfe9f0ddabfd4821958962a2dac9378e
Parents: ab4f8e6
Author: nickwallen <ni...@nickallen.org>
Authored: Sat Apr 7 11:28:01 2018 -0400
Committer: nickallen <ni...@apache.org>
Committed: Sat Apr 7 11:28:01 2018 -0400
----------------------------------------------------------------------
.../profiler/DefaultMessageDistributor.java | 54 +++-
.../src/main/flux/profiler/remote.yaml | 2 +
.../profiler/bolt/ProfileBuilderBolt.java | 149 +++++++---
.../profiler/bolt/ProfileSplitterBolt.java | 1 -
.../config/zookeeper/percentiles/profiler.json | 12 -
.../processing-time-test/profiler.json | 11 +
.../zookeeper/readme-example-1/profiler.json | 17 --
.../zookeeper/readme-example-2/profiler.json | 18 --
.../zookeeper/readme-example-3/profiler.json | 11 -
.../zookeeper/readme-example-4/profiler.json | 11 -
.../profiler/bolt/ProfileBuilderBoltTest.java | 130 +++------
.../integration/ProfilerIntegrationTest.java | 274 +++++--------------
.../configuration/profiler/ProfileConfig.java | 49 ++--
.../ZKConfigurationsCacheIntegrationTest.java | 4 +-
.../org/apache/metron/hbase/bolt/HBaseBolt.java | 22 +-
15 files changed, 319 insertions(+), 446 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/metron/blob/46bc63db/metron-analytics/metron-profiler-common/src/main/java/org/apache/metron/profiler/DefaultMessageDistributor.java
----------------------------------------------------------------------
diff --git a/metron-analytics/metron-profiler-common/src/main/java/org/apache/metron/profiler/DefaultMessageDistributor.java b/metron-analytics/metron-profiler-common/src/main/java/org/apache/metron/profiler/DefaultMessageDistributor.java
index ea5126f..70f4228 100644
--- a/metron-analytics/metron-profiler-common/src/main/java/org/apache/metron/profiler/DefaultMessageDistributor.java
+++ b/metron-analytics/metron-profiler-common/src/main/java/org/apache/metron/profiler/DefaultMessageDistributor.java
@@ -25,6 +25,7 @@ import com.google.common.cache.Cache;
import com.google.common.cache.CacheBuilder;
import com.google.common.cache.RemovalListener;
import com.google.common.cache.RemovalNotification;
+import org.apache.commons.lang.builder.HashCodeBuilder;
import org.apache.metron.common.configuration.profiler.ProfileConfig;
import org.apache.metron.stellar.dsl.Context;
import org.json.simple.JSONObject;
@@ -33,7 +34,6 @@ import org.slf4j.LoggerFactory;
import java.lang.invoke.MethodHandles;
import java.util.ArrayList;
-import java.util.Collection;
import java.util.List;
import java.util.Optional;
import java.util.concurrent.ExecutionException;
@@ -74,7 +74,7 @@ public class DefaultMessageDistributor implements MessageDistributor {
* messages. Once it has not received messages for a period of time, it is
* moved to the expired cache.
*/
- private transient Cache<String, ProfileBuilder> activeCache;
+ private transient Cache<Integer, ProfileBuilder> activeCache;
/**
* A cache of expired profiles.
@@ -85,7 +85,7 @@ public class DefaultMessageDistributor implements MessageDistributor {
* can flush the state of the expired profile. If the client does not flush
* the expired profiles, this state will be lost forever.
*/
- private transient Cache<String, ProfileBuilder> expiredCache;
+ private transient Cache<Integer, ProfileBuilder> expiredCache;
/**
* Create a new message distributor.
@@ -222,7 +222,7 @@ public class DefaultMessageDistributor implements MessageDistributor {
* @param cache The cache to flush.
* @return The measurements captured when flushing the profiles.
*/
- private List<ProfileMeasurement> flushCache(Cache<String, ProfileBuilder> cache) {
+ private List<ProfileMeasurement> flushCache(Cache<Integer, ProfileBuilder> cache) {
List<ProfileMeasurement> measurements = new ArrayList<>();
for(ProfileBuilder profileBuilder: cache.asMap().values()) {
@@ -262,11 +262,19 @@ public class DefaultMessageDistributor implements MessageDistributor {
/**
* Builds the key that is used to lookup the {@link ProfileBuilder} within the cache.
*
+ * <p>The cache key is built using the hash codes of the profile and entity name. If the profile
+ * definition is ever changed, the same cache entry will not be reused. This ensures that no
+ * state can be carried over from the old definition into the new, which might result in an
+ * invalid profile measurement.
+ *
* @param profile The profile definition.
* @param entity The entity.
*/
- private String cacheKey(ProfileConfig profile, String entity) {
- return format("%s:%s", profile, entity);
+ private int cacheKey(ProfileConfig profile, String entity) {
+ return new HashCodeBuilder(17, 37)
+ .append(profile)
+ .append(entity)
+ .hashCode();
}
public DefaultMessageDistributor withPeriodDurationMillis(long periodDurationMillis) {
@@ -281,29 +289,45 @@ public class DefaultMessageDistributor implements MessageDistributor {
/**
* A listener that is notified when profiles expire from the active cache.
*/
- private class ActiveCacheRemovalListener implements RemovalListener<String, ProfileBuilder> {
+ private class ActiveCacheRemovalListener implements RemovalListener<Integer, ProfileBuilder> {
@Override
- public void onRemoval(RemovalNotification<String, ProfileBuilder> notification) {
+ public void onRemoval(RemovalNotification<Integer, ProfileBuilder> notification) {
- String key = notification.getKey();
ProfileBuilder expired = notification.getValue();
+ LOG.warn("Profile expired from active cache; profile={}, entity={}",
+ expired.getDefinition().getProfile(),
+ expired.getEntity());
- LOG.warn("Profile expired from active cache; key={}", key);
- expiredCache.put(key, expired);
+ // add the profile to the expired cache
+ expiredCache.put(notification.getKey(), expired);
}
}
/**
* A listener that is notified when profiles expire from the active cache.
*/
- private class ExpiredCacheRemovalListener implements RemovalListener<String, ProfileBuilder> {
+ private class ExpiredCacheRemovalListener implements RemovalListener<Integer, ProfileBuilder> {
@Override
- public void onRemoval(RemovalNotification<String, ProfileBuilder> notification) {
+ public void onRemoval(RemovalNotification<Integer, ProfileBuilder> notification) {
+
+ if(notification.wasEvicted()) {
+
+ // the expired profile was NOT flushed in time
+ ProfileBuilder expired = notification.getValue();
+ LOG.warn("Expired profile NOT flushed before removal, some state lost; profile={}, entity={}",
+ expired.getDefinition().getProfile(),
+ expired.getEntity());
- String key = notification.getKey();
- LOG.debug("Profile removed from expired cache; key={}", key);
+ } else {
+
+ // the expired profile was flushed successfully
+ ProfileBuilder expired = notification.getValue();
+ LOG.debug("Expired profile successfully flushed; profile={}, entity={}",
+ expired.getDefinition().getProfile(),
+ expired.getEntity());
+ }
}
}
}
http://git-wip-us.apache.org/repos/asf/metron/blob/46bc63db/metron-analytics/metron-profiler/src/main/flux/profiler/remote.yaml
----------------------------------------------------------------------
diff --git a/metron-analytics/metron-profiler/src/main/flux/profiler/remote.yaml b/metron-analytics/metron-profiler/src/main/flux/profiler/remote.yaml
index 83c9fde..6ad007b 100644
--- a/metron-analytics/metron-profiler/src/main/flux/profiler/remote.yaml
+++ b/metron-analytics/metron-profiler/src/main/flux/profiler/remote.yaml
@@ -160,6 +160,8 @@ bolts:
args: [ref: "windowLag"]
- name: "withMaxNumberOfRoutes"
args: [${profiler.max.routes.per.bolt}]
+ - name: "withTimestampField"
+ args: ["timestamp"]
- id: "hbaseBolt"
className: "org.apache.metron.hbase.bolt.HBaseBolt"
http://git-wip-us.apache.org/repos/asf/metron/blob/46bc63db/metron-analytics/metron-profiler/src/main/java/org/apache/metron/profiler/bolt/ProfileBuilderBolt.java
----------------------------------------------------------------------
diff --git a/metron-analytics/metron-profiler/src/main/java/org/apache/metron/profiler/bolt/ProfileBuilderBolt.java b/metron-analytics/metron-profiler/src/main/java/org/apache/metron/profiler/bolt/ProfileBuilderBolt.java
index ffe823f..fb3d2d0 100644
--- a/metron-analytics/metron-profiler/src/main/java/org/apache/metron/profiler/bolt/ProfileBuilderBolt.java
+++ b/metron-analytics/metron-profiler/src/main/java/org/apache/metron/profiler/bolt/ProfileBuilderBolt.java
@@ -42,13 +42,13 @@ import org.apache.metron.stellar.common.utils.ConversionUtils;
import org.apache.metron.stellar.dsl.Context;
import org.apache.metron.zookeeper.SimpleEventListener;
import org.apache.metron.zookeeper.ZKCache;
-import org.apache.storm.Config;
+import org.apache.storm.StormTimer;
import org.apache.storm.task.OutputCollector;
import org.apache.storm.task.TopologyContext;
import org.apache.storm.topology.OutputFieldsDeclarer;
import org.apache.storm.topology.base.BaseWindowedBolt;
import org.apache.storm.tuple.Tuple;
-import org.apache.storm.utils.TupleUtils;
+import org.apache.storm.utils.Utils;
import org.apache.storm.windowing.TupleWindow;
import org.json.simple.JSONObject;
import org.json.simple.parser.JSONParser;
@@ -60,6 +60,8 @@ import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.concurrent.TimeUnit;
+import java.util.function.Function;
+import java.util.function.Supplier;
import static java.lang.String.format;
import static org.apache.metron.profiler.bolt.ProfileSplitterBolt.ENTITY_TUPLE_FIELD;
@@ -127,6 +129,9 @@ public class ProfileBuilderBolt extends BaseWindowedBolt implements Reloadable {
/**
* Distributes messages to the profile builders.
+ *
+ * <p>Since expired profiles are flushed on a separate thread, all access to this
+ * {@code MessageDistributor} needs to be protected.
*/
private MessageDistributor messageDistributor;
@@ -145,9 +150,21 @@ public class ProfileBuilderBolt extends BaseWindowedBolt implements Reloadable {
private List<ProfileMeasurementEmitter> emitters;
/**
- * Signals when it is time to flush.
+ * Signals when it is time to flush the active profiles.
+ */
+ private FlushSignal activeFlushSignal;
+
+ /**
+ * A timer that flushes expired profiles on a regular interval. The expired profiles
+ * are flushed on a separate thread.
+ *
+ * <p>Flushing expired profiles ensures that any profiles that stop receiving messages
+ * for an extended period of time will continue to be flushed.
+ *
+ * <p>This introduces concurrency issues as the bolt is no longer single threaded. Due
+ * to this, all access to the {@code MessageDistributor} needs to be protected.
*/
- private FlushSignal flushSignal;
+ private StormTimer expiredFlushTimer;
public ProfileBuilderBolt() {
this.emitters = new ArrayList<>();
@@ -183,16 +200,26 @@ public class ProfileBuilderBolt extends BaseWindowedBolt implements Reloadable {
this.parser = new JSONParser();
this.messageDistributor = new DefaultMessageDistributor(periodDurationMillis, profileTimeToLiveMillis, maxNumberOfRoutes);
this.configurations = new ProfilerConfigurations();
- this.flushSignal = new FixedFrequencyFlushSignal(periodDurationMillis);
+ this.activeFlushSignal = new FixedFrequencyFlushSignal(periodDurationMillis);
setupZookeeper();
+ startExpiredFlushTimer();
}
@Override
public void cleanup() {
- zookeeperCache.close();
- zookeeperClient.close();
+ try {
+ zookeeperCache.close();
+ zookeeperClient.close();
+ expiredFlushTimer.close();
+
+ } catch(Throwable e) {
+ LOG.error("Exception when cleaning up", e);
+ }
}
+ /**
+ * Setup connectivity to Zookeeper which provides the necessary configuration for the bolt.
+ */
private void setupZookeeper() {
try {
if (zookeeperClient == null) {
@@ -248,18 +275,6 @@ public class ProfileBuilderBolt extends BaseWindowedBolt implements Reloadable {
emitters.forEach(emitter -> emitter.declareOutputFields(declarer));
}
- /**
- * Defines the frequency at which the bolt will receive tick tuples. Tick tuples are
- * used to control how often a profile is flushed.
- */
- @Override
- public Map<String, Object> getComponentConfiguration() {
-
- Map<String, Object> conf = super.getComponentConfiguration();
- conf.put(Config.TOPOLOGY_TICK_TUPLE_FREQ_SECS, TimeUnit.MILLISECONDS.toSeconds(profileTimeToLiveMillis));
- return conf;
- }
-
private Context getStellarContext() {
Map<String, Object> global = getConfigurations().getGlobalConfig();
@@ -282,24 +297,12 @@ public class ProfileBuilderBolt extends BaseWindowedBolt implements Reloadable {
// handle each tuple in the window
for(Tuple tuple : window.get()) {
-
- if(TupleUtils.isTick(tuple)) {
- handleTick();
-
- } else {
- handleMessage(tuple);
- }
+ handleMessage(tuple);
}
- // time to flush?
- if(flushSignal.isTimeToFlush()) {
- flushSignal.reset();
-
- // flush the active profiles
- List<ProfileMeasurement> measurements = messageDistributor.flush();
- emitMeasurements(measurements);
-
- LOG.debug("Flushed active profiles and found {} measurement(s).", measurements.size());
+ // time to flush active profiles?
+ if(activeFlushSignal.isTimeToFlush()) {
+ flushActive();
}
} catch (Throwable e) {
@@ -310,17 +313,37 @@ public class ProfileBuilderBolt extends BaseWindowedBolt implements Reloadable {
}
/**
- * Flush all expired profiles when a 'tick' is received.
+ * Flush all active profiles.
+ */
+ protected void flushActive() {
+ activeFlushSignal.reset();
+
+ // flush the active profiles
+ List<ProfileMeasurement> measurements;
+ synchronized(messageDistributor) {
+ measurements = messageDistributor.flush();
+ emitMeasurements(measurements);
+ }
+
+ LOG.debug("Flushed active profiles and found {} measurement(s).", measurements.size());
+
+ }
+
+ /**
+ * Flushes all expired profiles.
*
- * If a profile has not received a message for an extended period of time then it is
+ * <p>If a profile has not received a message for an extended period of time then it is
* marked as expired. Periodically we need to flush these expired profiles to ensure
* that their state is not lost.
*/
- private void handleTick() {
+ protected void flushExpired() {
// flush the expired profiles
- List<ProfileMeasurement> measurements = messageDistributor.flushExpired();
- emitMeasurements(measurements);
+ List<ProfileMeasurement> measurements;
+ synchronized (messageDistributor) {
+ measurements = messageDistributor.flushExpired();
+ emitMeasurements(measurements);
+ }
LOG.debug("Flushed expired profiles and found {} measurement(s).", measurements.size());
}
@@ -339,11 +362,13 @@ public class ProfileBuilderBolt extends BaseWindowedBolt implements Reloadable {
Long timestamp = getField(TIMESTAMP_TUPLE_FIELD, input, Long.class);
// keep track of time
- flushSignal.update(timestamp);
+ activeFlushSignal.update(timestamp);
// distribute the message
MessageRoute route = new MessageRoute(definition, entity);
- messageDistributor.distribute(message, timestamp, route, getStellarContext());
+ synchronized (messageDistributor) {
+ messageDistributor.distribute(message, timestamp, route, getStellarContext());
+ }
LOG.debug("Message distributed: profile={}, entity={}, timestamp={}", definition.getProfile(), entity, timestamp);
}
@@ -395,10 +420,46 @@ public class ProfileBuilderBolt extends BaseWindowedBolt implements Reloadable {
return value;
}
+ /**
+ * Converts milliseconds to seconds and handles an ugly cast.
+ *
+ * @param millis Duration in milliseconds.
+ * @return Duration in seconds.
+ */
+ private int toSeconds(long millis) {
+ return (int) TimeUnit.MILLISECONDS.toSeconds(millis);
+ }
+
+ /**
+ * Creates a timer that regularly flushes expired profiles on a separate thread.
+ */
+ private void startExpiredFlushTimer() {
+
+ expiredFlushTimer = createTimer("flush-expired-profiles-timer");
+ expiredFlushTimer.scheduleRecurring(0, toSeconds(profileTimeToLiveMillis), () -> flushExpired());
+ }
+
+ /**
+ * Creates a timer that can execute a task on a fixed interval.
+ *
+ * <p>If the timer encounters an exception, the entire process will be killed.
+ *
+ * @param name The name of the timer.
+ * @return The timer.
+ */
+ private StormTimer createTimer(String name) {
+
+ return new StormTimer(name, (thread, exception) -> {
+ String msg = String.format("Unexpected exception in timer task; timer=%s", name);
+ LOG.error(msg, exception);
+ Utils.exitProcess(1, msg);
+ });
+ }
+
@Override
public BaseWindowedBolt withTumblingWindow(BaseWindowedBolt.Duration duration) {
- // need to capture the window duration for setting the flush count down
+ // need to capture the window duration to validate it along with other profiler settings
this.windowDurationMillis = duration.value;
return super.withTumblingWindow(duration);
}
@@ -464,7 +525,7 @@ public class ProfileBuilderBolt extends BaseWindowedBolt implements Reloadable {
}
public ProfileBuilderBolt withFlushSignal(FlushSignal flushSignal) {
- this.flushSignal = flushSignal;
+ this.activeFlushSignal = flushSignal;
return this;
}
http://git-wip-us.apache.org/repos/asf/metron/blob/46bc63db/metron-analytics/metron-profiler/src/main/java/org/apache/metron/profiler/bolt/ProfileSplitterBolt.java
----------------------------------------------------------------------
diff --git a/metron-analytics/metron-profiler/src/main/java/org/apache/metron/profiler/bolt/ProfileSplitterBolt.java b/metron-analytics/metron-profiler/src/main/java/org/apache/metron/profiler/bolt/ProfileSplitterBolt.java
index 4e62eee..a92a432 100644
--- a/metron-analytics/metron-profiler/src/main/java/org/apache/metron/profiler/bolt/ProfileSplitterBolt.java
+++ b/metron-analytics/metron-profiler/src/main/java/org/apache/metron/profiler/bolt/ProfileSplitterBolt.java
@@ -21,7 +21,6 @@
package org.apache.metron.profiler.bolt;
import org.apache.metron.common.bolt.ConfiguredProfilerBolt;
-import org.apache.metron.common.configuration.profiler.ProfileConfig;
import org.apache.metron.common.configuration.profiler.ProfilerConfig;
import org.apache.metron.profiler.DefaultMessageRouter;
import org.apache.metron.profiler.MessageRoute;
http://git-wip-us.apache.org/repos/asf/metron/blob/46bc63db/metron-analytics/metron-profiler/src/test/config/zookeeper/percentiles/profiler.json
----------------------------------------------------------------------
diff --git a/metron-analytics/metron-profiler/src/test/config/zookeeper/percentiles/profiler.json b/metron-analytics/metron-profiler/src/test/config/zookeeper/percentiles/profiler.json
deleted file mode 100644
index 8a54834..0000000
--- a/metron-analytics/metron-profiler/src/test/config/zookeeper/percentiles/profiler.json
+++ /dev/null
@@ -1,12 +0,0 @@
-{
- "profiles": [
- {
- "profile": "percentiles",
- "foreach": "ip_src_addr",
- "onlyif": "protocol == 'HTTP'",
- "init": { "s": "STATS_INIT(100)" },
- "update": { "s": "STATS_ADD(s, length)" },
- "result": "STATS_PERCENTILE(s, 0.7)"
- }
- ]
-}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/metron/blob/46bc63db/metron-analytics/metron-profiler/src/test/config/zookeeper/processing-time-test/profiler.json
----------------------------------------------------------------------
diff --git a/metron-analytics/metron-profiler/src/test/config/zookeeper/processing-time-test/profiler.json b/metron-analytics/metron-profiler/src/test/config/zookeeper/processing-time-test/profiler.json
new file mode 100644
index 0000000..e75ec0f
--- /dev/null
+++ b/metron-analytics/metron-profiler/src/test/config/zookeeper/processing-time-test/profiler.json
@@ -0,0 +1,11 @@
+{
+ "profiles": [
+ {
+ "profile": "processing-time-test",
+ "foreach": "ip_src_addr",
+ "init": { "counter": "0" },
+ "update": { "counter": "counter + 1" },
+ "result": "counter"
+ }
+ ]
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/metron/blob/46bc63db/metron-analytics/metron-profiler/src/test/config/zookeeper/readme-example-1/profiler.json
----------------------------------------------------------------------
diff --git a/metron-analytics/metron-profiler/src/test/config/zookeeper/readme-example-1/profiler.json b/metron-analytics/metron-profiler/src/test/config/zookeeper/readme-example-1/profiler.json
deleted file mode 100644
index 96c60a1..0000000
--- a/metron-analytics/metron-profiler/src/test/config/zookeeper/readme-example-1/profiler.json
+++ /dev/null
@@ -1,17 +0,0 @@
-{
- "profiles": [
- {
- "profile": "example1",
- "foreach": "ip_src_addr",
- "onlyif": "protocol == 'HTTP'",
- "init": {
- "total_bytes": 0.0
- },
- "update": {
- "total_bytes": "total_bytes + bytes_in"
- },
- "result": "total_bytes",
- "expires": 30
- }
- ]
-}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/metron/blob/46bc63db/metron-analytics/metron-profiler/src/test/config/zookeeper/readme-example-2/profiler.json
----------------------------------------------------------------------
diff --git a/metron-analytics/metron-profiler/src/test/config/zookeeper/readme-example-2/profiler.json b/metron-analytics/metron-profiler/src/test/config/zookeeper/readme-example-2/profiler.json
deleted file mode 100644
index e5d8f39..0000000
--- a/metron-analytics/metron-profiler/src/test/config/zookeeper/readme-example-2/profiler.json
+++ /dev/null
@@ -1,18 +0,0 @@
-{
- "profiles": [
- {
- "profile": "example2",
- "foreach": "ip_src_addr",
- "onlyif": "protocol == 'DNS' or protocol == 'HTTP'",
- "init": {
- "num_dns": 1.0,
- "num_http": 1.0
- },
- "update": {
- "num_dns": "num_dns + (if protocol == 'DNS' then 1 else 0)",
- "num_http": "num_http + (if protocol == 'HTTP' then 1 else 0)"
- },
- "result": "num_dns / num_http"
- }
- ]
-}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/metron/blob/46bc63db/metron-analytics/metron-profiler/src/test/config/zookeeper/readme-example-3/profiler.json
----------------------------------------------------------------------
diff --git a/metron-analytics/metron-profiler/src/test/config/zookeeper/readme-example-3/profiler.json b/metron-analytics/metron-profiler/src/test/config/zookeeper/readme-example-3/profiler.json
deleted file mode 100644
index 67cdefd..0000000
--- a/metron-analytics/metron-profiler/src/test/config/zookeeper/readme-example-3/profiler.json
+++ /dev/null
@@ -1,11 +0,0 @@
-{
- "profiles": [
- {
- "profile": "example3",
- "foreach": "ip_src_addr",
- "onlyif": "protocol == 'HTTP'",
- "update": { "s": "STATS_ADD(s, length)" },
- "result": "STATS_MEAN(s)"
- }
- ]
-}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/metron/blob/46bc63db/metron-analytics/metron-profiler/src/test/config/zookeeper/readme-example-4/profiler.json
----------------------------------------------------------------------
diff --git a/metron-analytics/metron-profiler/src/test/config/zookeeper/readme-example-4/profiler.json b/metron-analytics/metron-profiler/src/test/config/zookeeper/readme-example-4/profiler.json
deleted file mode 100644
index b003ce0..0000000
--- a/metron-analytics/metron-profiler/src/test/config/zookeeper/readme-example-4/profiler.json
+++ /dev/null
@@ -1,11 +0,0 @@
-{
- "profiles": [
- {
- "profile": "example4",
- "foreach": "ip_src_addr",
- "onlyif": "protocol == 'HTTP'",
- "update": { "s": "STATS_ADD(s, length)" },
- "result": "s"
- }
- ]
-}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/metron/blob/46bc63db/metron-analytics/metron-profiler/src/test/java/org/apache/metron/profiler/bolt/ProfileBuilderBoltTest.java
----------------------------------------------------------------------
diff --git a/metron-analytics/metron-profiler/src/test/java/org/apache/metron/profiler/bolt/ProfileBuilderBoltTest.java b/metron-analytics/metron-profiler/src/test/java/org/apache/metron/profiler/bolt/ProfileBuilderBoltTest.java
index 78e20e0..3d009fb 100644
--- a/metron-analytics/metron-profiler/src/test/java/org/apache/metron/profiler/bolt/ProfileBuilderBoltTest.java
+++ b/metron-analytics/metron-profiler/src/test/java/org/apache/metron/profiler/bolt/ProfileBuilderBoltTest.java
@@ -48,7 +48,6 @@ import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;
import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.fail;
import static org.mockito.Matchers.any;
import static org.mockito.Matchers.eq;
import static org.mockito.Mockito.mock;
@@ -67,6 +66,7 @@ public class ProfileBuilderBoltTest extends BaseBoltTest {
private ProfileConfig profile2;
private ProfileMeasurementEmitter emitter;
private ManualFlushSignal flushSignal;
+ private ProfileMeasurement measurement;
@Before
public void setup() throws Exception {
@@ -95,6 +95,12 @@ public class ProfileBuilderBoltTest extends BaseBoltTest {
.withUpdate(Collections.singletonMap("x", "x + 1"))
.withResult("x");
+ measurement = new ProfileMeasurement()
+ .withEntity("entity1")
+ .withProfileName("profile1")
+ .withPeriod(1000, 500, TimeUnit.MILLISECONDS)
+ .withProfileValue(22);
+
flushSignal = new ManualFlushSignal();
flushSignal.setFlushNow(false);
}
@@ -127,23 +133,16 @@ public class ProfileBuilderBoltTest extends BaseBoltTest {
/**
* If the {@code FlushSignal} tells the bolt to flush, it should flush the {@code MessageDistributor}
- * and emit the {@code ProfileMeasurement} values.
+ * and emit the {@code ProfileMeasurement} values from all active profiles.
*/
@Test
- public void testEmitWhenFlush() throws Exception {
+ public void testFlushActiveProfiles() throws Exception {
ProfileBuilderBolt bolt = createBolt();
- // create a profile measurement
- ProfileMeasurement m = new ProfileMeasurement()
- .withEntity("entity1")
- .withProfileName("profile1")
- .withPeriod(1000, 500, TimeUnit.MILLISECONDS)
- .withProfileValue(22);
-
// create a mock that returns the profile measurement above
MessageDistributor distributor = mock(MessageDistributor.class);
- when(distributor.flush()).thenReturn(Collections.singletonList(m));
+ when(distributor.flush()).thenReturn(Collections.singletonList(measurement));
bolt.withMessageDistributor(distributor);
// signal the bolt to flush
@@ -157,30 +156,23 @@ public class ProfileBuilderBoltTest extends BaseBoltTest {
// a profile measurement should be emitted by the bolt
List<ProfileMeasurement> measurements = getProfileMeasurements(outputCollector, 1);
assertEquals(1, measurements.size());
- assertEquals(m, measurements.get(0));
+ assertEquals(measurement, measurements.get(0));
}
/**
* If the {@code FlushSignal} tells the bolt NOT to flush, nothing should be emitted.
*/
@Test
- public void testDoNotEmitWhenNoFlush() throws Exception {
+ public void testDoNotFlushActiveProfiles() throws Exception {
ProfileBuilderBolt bolt = createBolt();
- // create a profile measurement
- ProfileMeasurement m = new ProfileMeasurement()
- .withEntity("entity1")
- .withProfileName("profile1")
- .withPeriod(1000, 500, TimeUnit.MILLISECONDS)
- .withProfileValue(22);
-
- // create a mock that returns the profile measurement above
+ // create a mock where flush() returns the profile measurement above
MessageDistributor distributor = mock(MessageDistributor.class);
- when(distributor.flush()).thenReturn(Collections.singletonList(m));
+ when(distributor.flush()).thenReturn(Collections.singletonList(measurement));
bolt.withMessageDistributor(distributor);
- // no flush signal
+ // there is no flush signal
flushSignal.setFlushNow(false);
// execute the bolt
@@ -193,6 +185,29 @@ public class ProfileBuilderBoltTest extends BaseBoltTest {
}
/**
+ * Expired profiles should be flushed regularly, even if no input telemetry
+ * has been received.
+ */
+ @Test
+ public void testFlushExpiredProfiles() throws Exception {
+
+ ProfileBuilderBolt bolt = createBolt();
+
+ // create a mock where flushExpired() returns the profile measurement above
+ MessageDistributor distributor = mock(MessageDistributor.class);
+ when(distributor.flushExpired()).thenReturn(Collections.singletonList(measurement));
+ bolt.withMessageDistributor(distributor);
+
+ // execute test by flushing expired profiles. this is normally triggered by a timer task.
+ bolt.flushExpired();
+
+ // a profile measurement should be emitted by the bolt
+ List<ProfileMeasurement> measurements = getProfileMeasurements(outputCollector, 1);
+ assertEquals(1, measurements.size());
+ assertEquals(measurement, measurements.get(0));
+ }
+
+ /**
* A {@link ProfileMeasurement} is built for each profile/entity pair. The measurement should be emitted to each
* destination defined by the profile. By default, a profile uses both Kafka and HBase as destinations.
*/
@@ -232,61 +247,6 @@ public class ProfileBuilderBoltTest extends BaseBoltTest {
verify(outputCollector, times(1)).emit(eq("destination3"), any());
}
- @Test
- public void testFlushExpiredWithTick() throws Exception {
-
- ProfileBuilderBolt bolt = createBolt();
-
- // create a mock
- MessageDistributor distributor = mock(MessageDistributor.class);
- bolt.withMessageDistributor(distributor);
-
- // tell the bolt to flush on the first window
- flushSignal.setFlushNow(true);
-
- // execute the bolt; include a tick tuple in the window
- Tuple tuple1 = createTuple("entity", message1, profile1, 100000000L);
- TupleWindow tupleWindow = createWindow(tuple1, mockTickTuple());
- bolt.execute(tupleWindow);
-
- // ensure the expired profiles were flushed when the tick tuple was received
- verify(distributor).flushExpired();
- }
-
- @Test
- public void testFlushExpiredWithNoTick() throws Exception {
-
- ProfileBuilderBolt bolt = createBolt();
-
- // create a mock
- MessageDistributor distributor = mock(MessageDistributor.class);
- bolt.withMessageDistributor(distributor);
-
- // tell the bolt to flush on the first window
- flushSignal.setFlushNow(true);
-
- // execute the bolt; NO tick tuple
- Tuple tuple1 = createTuple("entity", message1, profile1, 100000000L);
- TupleWindow tupleWindow = createWindow(tuple1);
- bolt.execute(tupleWindow);
-
- // there was no tick tuple; the expired profiles should NOT have been flushed
- verify(distributor, times(0)).flushExpired();
- }
-
- /**
- * Creates a mock tick tuple to use for testing.
- * @return A mock tick tuple.
- */
- private Tuple mockTickTuple() {
-
- Tuple tuple = mock(Tuple.class);
- when(tuple.getSourceComponent()).thenReturn("__system");
- when(tuple.getSourceStreamId()).thenReturn("__tick");
-
- return tuple;
- }
-
/**
* Retrieves the ProfileMeasurement(s) (if any) that have been emitted.
*
@@ -334,18 +294,6 @@ public class ProfileBuilderBoltTest extends BaseBoltTest {
*/
private ProfileBuilderBolt createBolt() throws IOException {
- return createBolt(30, TimeUnit.SECONDS);
- }
-
- /**
- * Create a ProfileBuilderBolt to test.
- *
- * @param windowDuration The event window duration.
- * @param windowDurationUnits The units of the event window duration.
- * @return A {@link ProfileBuilderBolt} to test.
- */
- private ProfileBuilderBolt createBolt(int windowDuration, TimeUnit windowDurationUnits) throws IOException {
-
// defines the zk configurations accessible from the bolt
ProfilerConfigurations configurations = new ProfilerConfigurations();
configurations.updateGlobalConfig(Collections.emptyMap());
@@ -359,7 +307,7 @@ public class ProfileBuilderBoltTest extends BaseBoltTest {
.withEmitter(emitter)
.withProfilerConfigurations(configurations)
.withPeriodDuration(1, TimeUnit.MINUTES)
- .withTumblingWindow(new BaseWindowedBolt.Duration(windowDuration, windowDurationUnits));
+ .withTumblingWindow(new BaseWindowedBolt.Duration(30, TimeUnit.SECONDS));
bolt.prepare(new HashMap<>(), topologyContext, outputCollector);
// set the flush signal AFTER calling 'prepare'
http://git-wip-us.apache.org/repos/asf/metron/blob/46bc63db/metron-analytics/metron-profiler/src/test/java/org/apache/metron/profiler/integration/ProfilerIntegrationTest.java
----------------------------------------------------------------------
diff --git a/metron-analytics/metron-profiler/src/test/java/org/apache/metron/profiler/integration/ProfilerIntegrationTest.java b/metron-analytics/metron-profiler/src/test/java/org/apache/metron/profiler/integration/ProfilerIntegrationTest.java
index c48a3e9..8f5ced3 100644
--- a/metron-analytics/metron-profiler/src/test/java/org/apache/metron/profiler/integration/ProfilerIntegrationTest.java
+++ b/metron-analytics/metron-profiler/src/test/java/org/apache/metron/profiler/integration/ProfilerIntegrationTest.java
@@ -20,9 +20,6 @@
package org.apache.metron.profiler.integration;
-import com.google.common.base.Joiner;
-import org.adrianwalker.multilinestring.Multiline;
-import org.apache.commons.math.util.MathUtils;
import org.apache.hadoop.hbase.Cell;
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.util.Bytes;
@@ -41,10 +38,8 @@ import org.apache.metron.profiler.hbase.ColumnBuilder;
import org.apache.metron.profiler.hbase.RowKeyBuilder;
import org.apache.metron.profiler.hbase.SaltyRowKeyBuilder;
import org.apache.metron.profiler.hbase.ValueOnlyColumnBuilder;
-import org.apache.metron.statistics.OnlineStatisticsProvider;
import org.junit.After;
import org.junit.AfterClass;
-import org.junit.Assert;
import org.junit.Before;
import org.junit.BeforeClass;
import org.junit.Test;
@@ -61,6 +56,7 @@ import static com.google.code.tempusfugit.temporal.Timeout.timeout;
import static com.google.code.tempusfugit.temporal.WaitFor.waitOrTimeout;
import static org.junit.Assert.assertArrayEquals;
import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
/**
* An integration test of the Profiler topology.
@@ -70,247 +66,103 @@ public class ProfilerIntegrationTest extends BaseIntegrationTest {
private static final String TEST_RESOURCES = "../../metron-analytics/metron-profiler/src/test";
private static final String FLUX_PATH = "../metron-profiler/src/main/flux/profiler/remote.yaml";
- /**
- * {
- * "ip_src_addr": "10.0.0.1",
- * "protocol": "HTTPS",
- * "length": 10,
- * "bytes_in": 234
- * }
- */
- @Multiline
- private static String message1;
-
- /**
- * {
- * "ip_src_addr": "10.0.0.2",
- * "protocol": "HTTP",
- * "length": 20,
- * "bytes_in": 390
- * }
- */
- @Multiline
- private static String message2;
-
- /**
- * {
- * "ip_src_addr": "10.0.0.3",
- * "protocol": "DNS",
- * "length": 30,
- * "bytes_in": 560
- * }
- */
- @Multiline
- private static String message3;
-
- private static ColumnBuilder columnBuilder;
- private static ZKServerComponent zkComponent;
- private static FluxTopologyComponent fluxComponent;
- private static KafkaComponent kafkaComponent;
- private static ConfigUploadComponent configUploadComponent;
- private static ComponentRunner runner;
- private static MockHTable profilerTable;
+ public static final long startAt = 10;
+ public static final String entity = "10.0.0.1";
private static final String tableName = "profiler";
private static final String columnFamily = "P";
- private static final double epsilon = 0.001;
private static final String inputTopic = Constants.INDEXING_TOPIC;
private static final String outputTopic = "profiles";
private static final int saltDivisor = 10;
- private static final long windowLagMillis = TimeUnit.SECONDS.toMillis(5);
+ private static final long windowLagMillis = TimeUnit.SECONDS.toMillis(1);
private static final long windowDurationMillis = TimeUnit.SECONDS.toMillis(5);
- private static final long periodDurationMillis = TimeUnit.SECONDS.toMillis(15);
- private static final long profileTimeToLiveMillis = TimeUnit.SECONDS.toMillis(20);
+ private static final long periodDurationMillis = TimeUnit.SECONDS.toMillis(10);
+ private static final long profileTimeToLiveMillis = TimeUnit.SECONDS.toMillis(15);
private static final long maxRoutesPerBolt = 100000;
- /**
- * Tests the first example contained within the README.
- */
- @Test
- public void testExample1() throws Exception {
-
- uploadConfig(TEST_RESOURCES + "/config/zookeeper/readme-example-1");
-
- // start the topology and write test messages to kafka
- fluxComponent.submitTopology();
- kafkaComponent.writeMessages(inputTopic, message1, message1, message1);
- kafkaComponent.writeMessages(inputTopic, message2, message2, message2);
- kafkaComponent.writeMessages(inputTopic, message3, message3, message3);
-
- // verify - ensure the profile is being persisted
- waitOrTimeout(() -> profilerTable.getPutLog().size() > 0,
- timeout(seconds(180)));
-
- // verify - only 10.0.0.2 sends 'HTTP', thus there should be only 1 value
- List<Double> actuals = read(profilerTable.getPutLog(), columnFamily,
- columnBuilder.getColumnQualifier("value"), Double.class);
-
- // verify - there are 3 'HTTP' each with 390 bytes
- Assert.assertTrue(actuals.stream().anyMatch(val ->
- MathUtils.equals(390.0 * 3, val, epsilon)
- ));
- }
-
- /**
- * Tests the second example contained within the README.
- */
- @Test
- public void testExample2() throws Exception {
-
- uploadConfig(TEST_RESOURCES + "/config/zookeeper/readme-example-2");
-
- // start the topology and write test messages to kafka
- fluxComponent.submitTopology();
- kafkaComponent.writeMessages(inputTopic, message1, message1, message1);
- kafkaComponent.writeMessages(inputTopic, message2, message2, message2);
- kafkaComponent.writeMessages(inputTopic, message3, message3, message3);
-
- // expect 2 values written by the profile; one for 10.0.0.2 and another for 10.0.0.3
- final int expected = 2;
-
- // verify - ensure the profile is being persisted
- waitOrTimeout(() -> profilerTable.getPutLog().size() >= expected,
- timeout(seconds(90)));
-
- // verify - expect 2 results as 2 hosts involved; 10.0.0.2 sends 'HTTP' and 10.0.0.3 send 'DNS'
- List<Double> actuals = read(profilerTable.getPutLog(), columnFamily,
- columnBuilder.getColumnQualifier("value"), Double.class);
-
- // verify - 10.0.0.3 -> 1/4
- Assert.assertTrue( "Could not find a value near 1/4. Actual values read are are: " + Joiner.on(",").join(actuals),
- actuals.stream().anyMatch(val -> MathUtils.equals(val, 1.0/4.0, epsilon)
- ));
-
- // verify - 10.0.0.2 -> 4/1
- Assert.assertTrue("Could not find a value near 4. Actual values read are are: " + Joiner.on(",").join(actuals),
- actuals.stream().anyMatch(val -> MathUtils.equals(val, 4.0/1.0, epsilon)
- ));
- }
-
- /**
- * Tests the third example contained within the README.
- */
- @Test
- public void testExample3() throws Exception {
-
- uploadConfig(TEST_RESOURCES + "/config/zookeeper/readme-example-3");
+ private static ColumnBuilder columnBuilder;
+ private static ZKServerComponent zkComponent;
+ private static FluxTopologyComponent fluxComponent;
+ private static KafkaComponent kafkaComponent;
+ private static ConfigUploadComponent configUploadComponent;
+ private static ComponentRunner runner;
+ private static MockHTable profilerTable;
- // start the topology and write test messages to kafka
- fluxComponent.submitTopology();
- kafkaComponent.writeMessages(inputTopic, message1, message1, message1);
- kafkaComponent.writeMessages(inputTopic, message2, message2, message2);
- kafkaComponent.writeMessages(inputTopic, message3, message3, message3);
-
- // verify - ensure the profile is being persisted
- waitOrTimeout(() -> profilerTable.getPutLog().size() > 0,
- timeout(seconds(90)));
-
- // verify - only 10.0.0.2 sends 'HTTP', thus there should be only 1 value
- List<Double> actuals = read(profilerTable.getPutLog(), columnFamily,
- columnBuilder.getColumnQualifier("value"), Double.class);
-
- // verify - there are 5 'HTTP' messages each with a length of 20, thus the average should be 20
- Assert.assertTrue("Could not find a value near 20. Actual values read are are: " + Joiner.on(",").join(actuals),
- actuals.stream().anyMatch(val -> MathUtils.equals(val, 20.0, epsilon)
- ));
- }
+ private static String message1;
+ private static String message2;
+ private static String message3;
/**
- * Tests the fourth example contained within the README.
+ * The Profiler can generate profiles based on processing time. With processing time,
+ * the Profiler builds profiles based on when the telemetry was processed.
+ *
+ * <p>Not defining a 'timestampField' within the Profiler configuration tells the Profiler
+ * to use processing time.
*/
@Test
- public void testExample4() throws Exception {
+ public void testProcessingTime() throws Exception {
- uploadConfig(TEST_RESOURCES + "/config/zookeeper/readme-example-4");
+ // upload the config to zookeeper
+ uploadConfig(TEST_RESOURCES + "/config/zookeeper/processing-time-test");
// start the topology and write test messages to kafka
fluxComponent.submitTopology();
- kafkaComponent.writeMessages(inputTopic, message1, message1, message1);
- kafkaComponent.writeMessages(inputTopic, message2, message2, message2);
- kafkaComponent.writeMessages(inputTopic, message3, message3, message3);
-
- // verify - ensure the profile is being persisted
- waitOrTimeout(() -> profilerTable.getPutLog().size() > 0,
- timeout(seconds(90)));
-
- // verify - only 10.0.0.2 sends 'HTTP', thus there should be only 1 value
- byte[] column = columnBuilder.getColumnQualifier("value");
- List<OnlineStatisticsProvider> actuals = read(profilerTable.getPutLog(), columnFamily, column, OnlineStatisticsProvider.class);
-
- // verify - there are 5 'HTTP' messages each with a length of 20, thus the average should be 20
- Assert.assertTrue("Could not find a value near 20. Actual values read are are: " + Joiner.on(",").join(actuals),
- actuals.stream().anyMatch(val -> MathUtils.equals(val.getMean(), 20.0, epsilon)
- ));
- }
- @Test
- public void testPercentiles() throws Exception {
-
- uploadConfig(TEST_RESOURCES + "/config/zookeeper/percentiles");
+ // the messages that will be applied to the profile
+ kafkaComponent.writeMessages(inputTopic, message1);
+ kafkaComponent.writeMessages(inputTopic, message2);
+ kafkaComponent.writeMessages(inputTopic, message3);
- // start the topology and write test messages to kafka
- fluxComponent.submitTopology();
- kafkaComponent.writeMessages(inputTopic, message1, message1, message1);
- kafkaComponent.writeMessages(inputTopic, message2, message2, message2);
- kafkaComponent.writeMessages(inputTopic, message3, message3, message3);
+ // storm needs at least one message to close its event window
+ int attempt = 0;
+ while(profilerTable.getPutLog().size() == 0 && attempt++ < 10) {
- // verify - ensure the profile is being persisted
- waitOrTimeout(() -> profilerTable.getPutLog().size() > 0,
- timeout(seconds(90)));
+ // sleep, at least beyond the current window
+ Thread.sleep(windowDurationMillis + windowLagMillis);
- List<Double> actuals = read(profilerTable.getPutLog(), columnFamily,
- columnBuilder.getColumnQualifier("value"), Double.class);
+ // send another message to help close the current event window
+ kafkaComponent.writeMessages(inputTopic, message2);
+ }
- // verify - the 70th percentile of x3, 20s = 20.0
- Assert.assertTrue("Could not find a value near 20. Actual values read are are: " + Joiner.on(",").join(actuals),
- actuals.stream().anyMatch(val -> MathUtils.equals(val, 20.0, epsilon)));
+ // validate what was flushed
+ List<Integer> actuals = read(
+ profilerTable.getPutLog(),
+ columnFamily,
+ columnBuilder.getColumnQualifier("value"),
+ Integer.class);
+ assertEquals(1, actuals.size());
+ assertTrue(actuals.get(0) >= 3);
}
/**
- * The Profiler can optionally perform event time processing. With event time processing,
+ * The Profiler can generate profiles using event time. With event time processing,
* the Profiler uses timestamps contained in the source telemetry.
*
* <p>Defining a 'timestampField' within the Profiler configuration tells the Profiler
* from which field the timestamp should be extracted.
*/
@Test
- public void testEventTimeProcessing() throws Exception {
-
- // constants used for the test
- final long startAt = 10;
- final String entity = "10.0.0.1";
- final String profileName = "event-time-test";
-
- // create some messages that contain a timestamp - a really old timestamp; close to 1970
- String message1 = new MessageBuilder()
- .withField("ip_src_addr", entity)
- .withField("timestamp", startAt)
- .build()
- .toJSONString();
-
- String message2 = new MessageBuilder()
- .withField("ip_src_addr", entity)
- .withField("timestamp", startAt + 100)
- .build()
- .toJSONString();
+ public void testEventTime() throws Exception {
+ // upload the profiler config to zookeeper
uploadConfig(TEST_RESOURCES + "/config/zookeeper/event-time-test");
// start the topology and write test messages to kafka
fluxComponent.submitTopology();
- kafkaComponent.writeMessages(inputTopic, message1, message2);
+ kafkaComponent.writeMessages(inputTopic, message1);
+ kafkaComponent.writeMessages(inputTopic, message2);
+ kafkaComponent.writeMessages(inputTopic, message3);
- // verify - ensure the profile is being persisted
- waitOrTimeout(() -> profilerTable.getPutLog().size() > 0,
- timeout(seconds(90)));
+ // wait until the profile is flushed
+ waitOrTimeout(() -> profilerTable.getPutLog().size() > 0, timeout(seconds(90)));
List<Put> puts = profilerTable.getPutLog();
assertEquals(1, puts.size());
// inspect the row key to ensure the profiler used event time correctly. the timestamp
// embedded in the row key should match those in the source telemetry
- byte[] expectedRowKey = generateExpectedRowKey(profileName, entity, startAt);
+ byte[] expectedRowKey = generateExpectedRowKey("event-time-test", entity, startAt);
byte[] actualRowKey = puts.get(0).getRow();
String msg = String.format("expected '%s', got '%s'",
new String(expectedRowKey, "UTF-8"),
@@ -364,6 +216,26 @@ public class ProfilerIntegrationTest extends BaseIntegrationTest {
@BeforeClass
public static void setupBeforeClass() throws UnableToStartException {
+
+ // create some messages that contain a timestamp - a really old timestamp; close to 1970
+ message1 = new MessageBuilder()
+ .withField("ip_src_addr", entity)
+ .withField("timestamp", startAt)
+ .build()
+ .toJSONString();
+
+ message2 = new MessageBuilder()
+ .withField("ip_src_addr", entity)
+ .withField("timestamp", startAt + 100)
+ .build()
+ .toJSONString();
+
+ message3 = new MessageBuilder()
+ .withField("ip_src_addr", entity)
+ .withField("timestamp", startAt + (windowDurationMillis * 2))
+ .build()
+ .toJSONString();
+
columnBuilder = new ValueOnlyColumnBuilder(columnFamily);
// storm topology properties
http://git-wip-us.apache.org/repos/asf/metron/blob/46bc63db/metron-platform/metron-common/src/main/java/org/apache/metron/common/configuration/profiler/ProfileConfig.java
----------------------------------------------------------------------
diff --git a/metron-platform/metron-common/src/main/java/org/apache/metron/common/configuration/profiler/ProfileConfig.java b/metron-platform/metron-common/src/main/java/org/apache/metron/common/configuration/profiler/ProfileConfig.java
index 6205fbf..f5b46e6 100644
--- a/metron-platform/metron-common/src/main/java/org/apache/metron/common/configuration/profiler/ProfileConfig.java
+++ b/metron-platform/metron-common/src/main/java/org/apache/metron/common/configuration/profiler/ProfileConfig.java
@@ -18,6 +18,8 @@
package org.apache.metron.common.configuration.profiler;
import com.fasterxml.jackson.annotation.JsonProperty;
+import org.apache.commons.lang.builder.EqualsBuilder;
+import org.apache.commons.lang.builder.HashCodeBuilder;
import java.io.Serializable;
import java.util.ArrayList;
@@ -225,32 +227,39 @@ public class ProfileConfig implements Serializable {
@Override
public boolean equals(Object o) {
- if (this == o) return true;
- if (o == null || getClass() != o.getClass()) return false;
+ if (this == o) {
+ return true;
+ }
- ProfileConfig that = (ProfileConfig) o;
+ if (o == null || getClass() != o.getClass()) {
+ return false;
+ }
- if (profile != null ? !profile.equals(that.profile) : that.profile != null) return false;
- if (foreach != null ? !foreach.equals(that.foreach) : that.foreach != null) return false;
- if (onlyif != null ? !onlyif.equals(that.onlyif) : that.onlyif != null) return false;
- if (init != null ? !init.equals(that.init) : that.init != null) return false;
- if (update != null ? !update.equals(that.update) : that.update != null) return false;
- if (groupBy != null ? !groupBy.equals(that.groupBy) : that.groupBy != null) return false;
- if (result != null ? !result.equals(that.result) : that.result != null) return false;
- return expires != null ? expires.equals(that.expires) : that.expires == null;
+ ProfileConfig that = (ProfileConfig) o;
+ return new EqualsBuilder()
+ .append(profile, that.profile)
+ .append(foreach, that.foreach)
+ .append(onlyif, that.onlyif)
+ .append(init, that.init)
+ .append(update, that.update)
+ .append(groupBy, that.groupBy)
+ .append(result, that.result)
+ .append(expires, that.expires)
+ .isEquals();
}
@Override
public int hashCode() {
- int result1 = profile != null ? profile.hashCode() : 0;
- result1 = 31 * result1 + (foreach != null ? foreach.hashCode() : 0);
- result1 = 31 * result1 + (onlyif != null ? onlyif.hashCode() : 0);
- result1 = 31 * result1 + (init != null ? init.hashCode() : 0);
- result1 = 31 * result1 + (update != null ? update.hashCode() : 0);
- result1 = 31 * result1 + (groupBy != null ? groupBy.hashCode() : 0);
- result1 = 31 * result1 + (result != null ? result.hashCode() : 0);
- result1 = 31 * result1 + (expires != null ? expires.hashCode() : 0);
- return result1;
+ return new HashCodeBuilder(17, 37)
+ .append(profile)
+ .append(foreach)
+ .append(onlyif)
+ .append(init)
+ .append(update)
+ .append(groupBy)
+ .append(result)
+ .append(expires)
+ .toHashCode();
}
@Override
http://git-wip-us.apache.org/repos/asf/metron/blob/46bc63db/metron-platform/metron-common/src/test/java/org/apache/metron/common/zookeeper/ZKConfigurationsCacheIntegrationTest.java
----------------------------------------------------------------------
diff --git a/metron-platform/metron-common/src/test/java/org/apache/metron/common/zookeeper/ZKConfigurationsCacheIntegrationTest.java b/metron-platform/metron-common/src/test/java/org/apache/metron/common/zookeeper/ZKConfigurationsCacheIntegrationTest.java
index ec4a98a..5240d7a 100644
--- a/metron-platform/metron-common/src/test/java/org/apache/metron/common/zookeeper/ZKConfigurationsCacheIntegrationTest.java
+++ b/metron-platform/metron-common/src/test/java/org/apache/metron/common/zookeeper/ZKConfigurationsCacheIntegrationTest.java
@@ -154,7 +154,7 @@ public class ZKConfigurationsCacheIntegrationTest {
}
{
//profiler
- byte[] config = IOUtils.toByteArray(new FileInputStream(new File(profilerDir, "/readme-example-1/profiler.json")));
+ byte[] config = IOUtils.toByteArray(new FileInputStream(new File(profilerDir, "/event-time-test/profiler.json")));
ConfigurationsUtils.writeProfilerConfigToZookeeper( config, client);
}
{
@@ -284,7 +284,7 @@ public class ZKConfigurationsCacheIntegrationTest {
}
//profiler
{
- File inFile = new File(profilerDir, "/readme-example-1/profiler.json");
+ File inFile = new File(profilerDir, "/event-time-test/profiler.json");
ProfilerConfig expectedConfig = JSONUtils.INSTANCE.load(inFile, ProfilerConfig.class);
ProfilerConfigurations config = cache.get( ProfilerConfigurations.class);
assertEventually(() -> Assert.assertEquals(expectedConfig, config.getProfilerConfig()));
http://git-wip-us.apache.org/repos/asf/metron/blob/46bc63db/metron-platform/metron-hbase/src/main/java/org/apache/metron/hbase/bolt/HBaseBolt.java
----------------------------------------------------------------------
diff --git a/metron-platform/metron-hbase/src/main/java/org/apache/metron/hbase/bolt/HBaseBolt.java b/metron-platform/metron-hbase/src/main/java/org/apache/metron/hbase/bolt/HBaseBolt.java
index d16e2f6..6953b18 100644
--- a/metron-platform/metron-hbase/src/main/java/org/apache/metron/hbase/bolt/HBaseBolt.java
+++ b/metron-platform/metron-hbase/src/main/java/org/apache/metron/hbase/bolt/HBaseBolt.java
@@ -24,7 +24,7 @@ import java.lang.invoke.MethodHandles;
import java.lang.reflect.InvocationTargetException;
import java.util.Map;
import java.util.Optional;
-import org.apache.commons.lang.StringUtils;
+import org.apache.commons.lang3.StringUtils;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.client.Durability;
import org.apache.metron.hbase.HTableProvider;
@@ -77,6 +77,8 @@ public class HBaseBolt extends BaseRichBolt {
/**
* The name of the class that should be used as a table provider.
+ *
+ * <p>Defaults to 'org.apache.metron.hbase.HTableProvider'.
*/
protected String tableProviderClazzName = "org.apache.metron.hbase.HTableProvider";
@@ -126,6 +128,8 @@ public class HBaseBolt extends BaseRichBolt {
@Override
public Map<String, Object> getComponentConfiguration() {
+ LOG.debug("Tick tuples expected every {} second(s)", flushIntervalSecs);
+
Config conf = new Config();
conf.put(Config.TOPOLOGY_TICK_TUPLE_FREQ_SECS, flushIntervalSecs);
return conf;
@@ -136,7 +140,13 @@ public class HBaseBolt extends BaseRichBolt {
this.collector = collector;
this.batchHelper = new BatchHelper(batchSize, collector);
- TableProvider provider = this.tableProvider == null ?getTableProvider(tableProviderClazzName):this.tableProvider;
+ TableProvider provider;
+ if(this.tableProvider == null) {
+ provider = createTableProvider(tableProviderClazzName);
+ } else {
+ provider = this.tableProvider;
+ }
+
hbaseClient = new HBaseClient(provider, HBaseConfiguration.create(), tableName);
}
@@ -147,6 +157,8 @@ public class HBaseBolt extends BaseRichBolt {
@Override
public void execute(Tuple tuple) {
+ LOG.trace("Received a tuple.");
+
try {
if (batchHelper.shouldHandle(tuple)) {
save(tuple);
@@ -179,12 +191,15 @@ public class HBaseBolt extends BaseRichBolt {
}
batchHelper.addBatch(tuple);
+ LOG.debug("Added mutation to the batch; size={}", batchHelper.getBatchSize());
}
/**
* Flush all saved operations.
*/
private void flush() {
+ LOG.debug("About to flush a batch of {} mutation(s)", batchHelper.getBatchSize());
+
this.hbaseClient.mutate();
batchHelper.ack();
}
@@ -193,7 +208,8 @@ public class HBaseBolt extends BaseRichBolt {
* Creates a TableProvider based on a class name.
* @param connectorImpl The class name of a TableProvider
*/
- private static TableProvider getTableProvider(String connectorImpl) {
+ private static TableProvider createTableProvider(String connectorImpl) {
+ LOG.trace("Creating table provider; className={}", connectorImpl);
// if class name not defined, use a reasonable default
if(StringUtils.isEmpty(connectorImpl) || connectorImpl.charAt(0) == '$') {