You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@metron.apache.org by ot...@apache.org on 2018/04/18 15:00:07 UTC
[37/52] [abbrv] metron git commit: METRON-1518 Build Failure When
Using Profile HDP-2.5.0.0 (nickwallen) closes apache/metron#986
METRON-1518 Build Failure When Using Profile HDP-2.5.0.0 (nickwallen) closes apache/metron#986
Project: http://git-wip-us.apache.org/repos/asf/metron/repo
Commit: http://git-wip-us.apache.org/repos/asf/metron/commit/ed50d48b
Tree: http://git-wip-us.apache.org/repos/asf/metron/tree/ed50d48b
Diff: http://git-wip-us.apache.org/repos/asf/metron/diff/ed50d48b
Branch: refs/heads/feature/METRON-1211-extensions-parsers-gradual
Commit: ed50d48bb47cf3f301884f6e18fe4efc8c1b91f1
Parents: a8b555d
Author: nickwallen <ni...@nickallen.org>
Authored: Tue Apr 10 17:16:20 2018 -0400
Committer: nickallen <ni...@apache.org>
Committed: Tue Apr 10 17:16:20 2018 -0400
----------------------------------------------------------------------
.../profiler/bolt/ProfileBuilderBolt.java | 51 +++++---------------
1 file changed, 11 insertions(+), 40 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/metron/blob/ed50d48b/metron-analytics/metron-profiler/src/main/java/org/apache/metron/profiler/bolt/ProfileBuilderBolt.java
----------------------------------------------------------------------
diff --git a/metron-analytics/metron-profiler/src/main/java/org/apache/metron/profiler/bolt/ProfileBuilderBolt.java b/metron-analytics/metron-profiler/src/main/java/org/apache/metron/profiler/bolt/ProfileBuilderBolt.java
index fb3d2d0..ca02b58 100644
--- a/metron-analytics/metron-profiler/src/main/java/org/apache/metron/profiler/bolt/ProfileBuilderBolt.java
+++ b/metron-analytics/metron-profiler/src/main/java/org/apache/metron/profiler/bolt/ProfileBuilderBolt.java
@@ -42,13 +42,11 @@ import org.apache.metron.stellar.common.utils.ConversionUtils;
import org.apache.metron.stellar.dsl.Context;
import org.apache.metron.zookeeper.SimpleEventListener;
import org.apache.metron.zookeeper.ZKCache;
-import org.apache.storm.StormTimer;
import org.apache.storm.task.OutputCollector;
import org.apache.storm.task.TopologyContext;
import org.apache.storm.topology.OutputFieldsDeclarer;
import org.apache.storm.topology.base.BaseWindowedBolt;
import org.apache.storm.tuple.Tuple;
-import org.apache.storm.utils.Utils;
import org.apache.storm.windowing.TupleWindow;
import org.json.simple.JSONObject;
import org.json.simple.parser.JSONParser;
@@ -59,9 +57,9 @@ import java.lang.invoke.MethodHandles;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
-import java.util.function.Function;
-import java.util.function.Supplier;
import static java.lang.String.format;
import static org.apache.metron.profiler.bolt.ProfileSplitterBolt.ENTITY_TUPLE_FIELD;
@@ -155,8 +153,8 @@ public class ProfileBuilderBolt extends BaseWindowedBolt implements Reloadable {
private FlushSignal activeFlushSignal;
/**
- * A timer that flushes expired profiles on a regular interval. The expired profiles
- * are flushed on a separate thread.
+ * An executor that flushes expired profiles at a regular interval on a separate
+ * thread.
*
* <p>Flushing expired profiles ensures that any profiles that stop receiving messages
* for an extended period of time will continue to be flushed.
@@ -164,7 +162,7 @@ public class ProfileBuilderBolt extends BaseWindowedBolt implements Reloadable {
* <p>This introduces concurrency issues as the bolt is no longer single threaded. Due
* to this, all access to the {@code MessageDistributor} needs to be protected.
*/
- private StormTimer expiredFlushTimer;
+ private transient ScheduledExecutorService flushExpiredExecutor;
public ProfileBuilderBolt() {
this.emitters = new ArrayList<>();
@@ -202,7 +200,7 @@ public class ProfileBuilderBolt extends BaseWindowedBolt implements Reloadable {
this.configurations = new ProfilerConfigurations();
this.activeFlushSignal = new FixedFrequencyFlushSignal(periodDurationMillis);
setupZookeeper();
- startExpiredFlushTimer();
+ startFlushingExpiredProfiles();
}
@Override
@@ -210,7 +208,7 @@ public class ProfileBuilderBolt extends BaseWindowedBolt implements Reloadable {
try {
zookeeperCache.close();
zookeeperClient.close();
- expiredFlushTimer.close();
+ flushExpiredExecutor.shutdown();
} catch(Throwable e) {
LOG.error("Exception when cleaning up", e);
@@ -421,39 +419,12 @@ public class ProfileBuilderBolt extends BaseWindowedBolt implements Reloadable {
}
/**
- * Converts milliseconds to seconds and handles an ugly cast.
- *
- * @param millis Duration in milliseconds.
- * @return Duration in seconds.
- */
- private int toSeconds(long millis) {
- return (int) TimeUnit.MILLISECONDS.toSeconds(millis);
- }
-
- /**
- * Creates a timer that regularly flushes expired profiles on a separate thread.
- */
- private void startExpiredFlushTimer() {
-
- expiredFlushTimer = createTimer("flush-expired-profiles-timer");
- expiredFlushTimer.scheduleRecurring(0, toSeconds(profileTimeToLiveMillis), () -> flushExpired());
- }
-
- /**
- * Creates a timer that can execute a task on a fixed interval.
- *
- * <p>If the timer encounters an exception, the entire process will be killed.
- *
- * @param name The name of the timer.
- * @return The timer.
+ * Creates a separate thread that regularly flushes expired profiles.
*/
- private StormTimer createTimer(String name) {
+ private void startFlushingExpiredProfiles() {
- return new StormTimer(name, (thread, exception) -> {
- String msg = String.format("Unexpected exception in timer task; timer=%s", name);
- LOG.error(msg, exception);
- Utils.exitProcess(1, msg);
- });
+ flushExpiredExecutor = Executors.newSingleThreadScheduledExecutor();
+ flushExpiredExecutor.scheduleAtFixedRate(() -> flushExpired(), 0, profileTimeToLiveMillis, TimeUnit.MILLISECONDS);
}
@Override