You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@storm.apache.org by sr...@apache.org on 2016/03/28 07:11:27 UTC
[05/11] storm git commit: STORM-676 addressed review comments
STORM-676 addressed review comments
Project: http://git-wip-us.apache.org/repos/asf/storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/2494df61
Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/2494df61
Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/2494df61
Branch: refs/heads/master
Commit: 2494df61e2607d7ff8086605234f6f54b796e6af
Parents: 87a2d92
Author: Satish Duggana <sd...@hortonworks.com>
Authored: Sun Mar 13 10:25:21 2016 +0530
Committer: Satish Duggana <sd...@hortonworks.com>
Committed: Wed Mar 23 12:20:55 2016 +0530
----------------------------------------------------------------------
.../trident/TridentHBaseWindowingStoreTopology.java | 6 ++----
.../TridentWindowingInmemoryStoreTopology.java | 7 +++----
.../hbase/trident/windowing/HBaseWindowsStore.java | 6 +++---
.../windowing/AbstractTridentWindowManager.java | 14 +++++++-------
.../windowing/InMemoryTridentWindowManager.java | 10 +++++-----
.../windowing/InMemoryWindowsStoreFactory.java | 13 +++++++++++--
.../windowing/StoreBasedTridentWindowManager.java | 14 +++++++-------
.../trident/windowing/WindowTridentProcessor.java | 15 ++++++++++-----
.../apache/storm/trident/windowing/WindowsState.java | 6 +++---
.../storm/trident/windowing/WindowsStateUpdater.java | 8 ++++----
10 files changed, 55 insertions(+), 44 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/storm/blob/2494df61/examples/storm-starter/src/jvm/org/apache/storm/starter/trident/TridentHBaseWindowingStoreTopology.java
----------------------------------------------------------------------
diff --git a/examples/storm-starter/src/jvm/org/apache/storm/starter/trident/TridentHBaseWindowingStoreTopology.java b/examples/storm-starter/src/jvm/org/apache/storm/starter/trident/TridentHBaseWindowingStoreTopology.java
index 24cc8d9..0ebaa1f 100644
--- a/examples/storm-starter/src/jvm/org/apache/storm/starter/trident/TridentHBaseWindowingStoreTopology.java
+++ b/examples/storm-starter/src/jvm/org/apache/storm/starter/trident/TridentHBaseWindowingStoreTopology.java
@@ -23,7 +23,6 @@ import org.apache.storm.LocalCluster;
import org.apache.storm.StormSubmitter;
import org.apache.storm.generated.StormTopology;
import org.apache.storm.hbase.trident.windowing.HBaseWindowsStoreFactory;
-import org.apache.storm.topology.base.BaseWindowedBolt;
import org.apache.storm.trident.Stream;
import org.apache.storm.trident.TridentTopology;
import org.apache.storm.trident.operation.BaseFunction;
@@ -44,13 +43,12 @@ import org.slf4j.LoggerFactory;
import java.util.HashMap;
import java.util.Map;
-import java.util.concurrent.TimeUnit;
/**
*
*/
public class TridentHBaseWindowingStoreTopology {
- private static final Logger log = LoggerFactory.getLogger(TridentHBaseWindowingStoreTopology.class);
+ private static final Logger LOG = LoggerFactory.getLogger(TridentHBaseWindowingStoreTopology.class);
public static StormTopology buildTopology(WindowsStoreFactory windowsStore) throws Exception {
FixedBatchSpout spout = new FixedBatchSpout(new Fields("sentence"), 3, new Values("the cow jumped over the moon"),
@@ -84,7 +82,7 @@ public class TridentHBaseWindowingStoreTopology {
@Override
public void execute(TridentTuple tuple, TridentCollector collector) {
- log.info("##########Echo.execute: " + tuple);
+ LOG.info("##########Echo.execute: " + tuple);
collector.emit(tuple.getValues());
}
http://git-wip-us.apache.org/repos/asf/storm/blob/2494df61/examples/storm-starter/src/jvm/org/apache/storm/starter/trident/TridentWindowingInmemoryStoreTopology.java
----------------------------------------------------------------------
diff --git a/examples/storm-starter/src/jvm/org/apache/storm/starter/trident/TridentWindowingInmemoryStoreTopology.java b/examples/storm-starter/src/jvm/org/apache/storm/starter/trident/TridentWindowingInmemoryStoreTopology.java
index 5f0cb4f..a2455a0 100644
--- a/examples/storm-starter/src/jvm/org/apache/storm/starter/trident/TridentWindowingInmemoryStoreTopology.java
+++ b/examples/storm-starter/src/jvm/org/apache/storm/starter/trident/TridentWindowingInmemoryStoreTopology.java
@@ -25,7 +25,6 @@ import org.apache.storm.generated.StormTopology;
import org.apache.storm.topology.base.BaseWindowedBolt;
import org.apache.storm.trident.Stream;
import org.apache.storm.trident.TridentTopology;
-import org.apache.storm.trident.operation.BaseAggregator;
import org.apache.storm.trident.operation.BaseFunction;
import org.apache.storm.trident.operation.Function;
import org.apache.storm.trident.operation.TridentCollector;
@@ -53,10 +52,10 @@ import java.util.Map;
import java.util.concurrent.TimeUnit;
/**
- *
+ * Sample application of trident windowing which uses inmemory store for storing tuples in window.
*/
public class TridentWindowingInmemoryStoreTopology {
- private static final Logger log = LoggerFactory.getLogger(TridentWindowingInmemoryStoreTopology.class);
+ private static final Logger LOG = LoggerFactory.getLogger(TridentWindowingInmemoryStoreTopology.class);
public static StormTopology buildTopology(WindowsStoreFactory windowStore, WindowConfig windowConfig) throws Exception {
FixedBatchSpout spout = new FixedBatchSpout(new Fields("sentence"), 3, new Values("the cow jumped over the moon"),
@@ -91,7 +90,7 @@ public class TridentWindowingInmemoryStoreTopology {
@Override
public void execute(TridentTuple tuple, TridentCollector collector) {
- log.info("##########Echo.execute: " + tuple);
+ LOG.info("##########Echo.execute: " + tuple);
collector.emit(tuple.getValues());
}
http://git-wip-us.apache.org/repos/asf/storm/blob/2494df61/external/storm-hbase/src/main/java/org/apache/storm/hbase/trident/windowing/HBaseWindowsStore.java
----------------------------------------------------------------------
diff --git a/external/storm-hbase/src/main/java/org/apache/storm/hbase/trident/windowing/HBaseWindowsStore.java b/external/storm-hbase/src/main/java/org/apache/storm/hbase/trident/windowing/HBaseWindowsStore.java
index 47879a4..ff3fbf9 100644
--- a/external/storm-hbase/src/main/java/org/apache/storm/hbase/trident/windowing/HBaseWindowsStore.java
+++ b/external/storm-hbase/src/main/java/org/apache/storm/hbase/trident/windowing/HBaseWindowsStore.java
@@ -51,7 +51,7 @@ import java.util.concurrent.ConcurrentLinkedQueue;
*
*/
public class HBaseWindowsStore implements WindowsStore {
- private static final Logger log = LoggerFactory.getLogger(HBaseWindowsStore.class);
+ private static final Logger LOG = LoggerFactory.getLogger(HBaseWindowsStore.class);
public static final String UTF_8 = "utf-8";
private final ThreadLocal<HTable> threadLocalHtable;
@@ -136,7 +136,7 @@ public class HBaseWindowsStore implements WindowsStore {
for (int i=0; i<results.length; i++) {
Result result = results[i];
if(result.isEmpty()) {
- log.error("Got empty result for key [{}]", keys.get(i));
+ LOG.error("Got empty result for key [{}]", keys.get(i));
throw new RuntimeException("Received empty result for key: "+keys.get(i));
}
Input input = new Input(result.getValue(family, qualifier));
@@ -267,7 +267,7 @@ public class HBaseWindowsStore implements WindowsStore {
try {
htable.close();
} catch (IOException e) {
- log.error(e.getMessage(), e);
+ LOG.error(e.getMessage(), e);
}
}
}
http://git-wip-us.apache.org/repos/asf/storm/blob/2494df61/storm-core/src/jvm/org/apache/storm/trident/windowing/AbstractTridentWindowManager.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/org/apache/storm/trident/windowing/AbstractTridentWindowManager.java b/storm-core/src/jvm/org/apache/storm/trident/windowing/AbstractTridentWindowManager.java
index 2941e28..fd7a957 100644
--- a/storm-core/src/jvm/org/apache/storm/trident/windowing/AbstractTridentWindowManager.java
+++ b/storm-core/src/jvm/org/apache/storm/trident/windowing/AbstractTridentWindowManager.java
@@ -47,7 +47,7 @@ import java.util.concurrent.atomic.AtomicInteger;
*
*/
public abstract class AbstractTridentWindowManager<T> implements ITridentWindowManager {
- private static final Logger log = LoggerFactory.getLogger(AbstractTridentWindowManager.class);
+ private static final Logger LOG = LoggerFactory.getLogger(AbstractTridentWindowManager.class);
protected final WindowManager<T> windowManager;
protected final Aggregator aggregator;
@@ -89,12 +89,12 @@ public abstract class AbstractTridentWindowManager<T> implements ITridentWindowM
}
private void preInitialize() {
- log.debug("Getting current trigger count for this component/task");
+ LOG.debug("Getting current trigger count for this component/task");
// get trigger count value from store
Object result = windowStore.get(windowTriggerCountId);
Integer currentCount = 0;
if(result == null) {
- log.info("No current trigger count in windows store.");
+ LOG.info("No current trigger count in windows store.");
} else {
currentCount = (Integer) result + 1;
}
@@ -119,13 +119,13 @@ public abstract class AbstractTridentWindowManager<T> implements ITridentWindowM
@Override
public void onExpiry(List<T> expiredEvents) {
- log.debug("onExpiry is invoked");
+ LOG.debug("onExpiry is invoked");
onTuplesExpired(expiredEvents);
}
@Override
public void onActivation(List<T> events, List<T> newEvents, List<T> expired) {
- log.debug("onActivation is invoked with events size: {}", events.size());
+ LOG.debug("onActivation is invoked with events size: [{}]", events.size());
// trigger occurred, create an aggregation and keep them in store
int currentTriggerId = triggerId.incrementAndGet();
execAggregatorAndStoreResult(currentTriggerId, events);
@@ -230,10 +230,10 @@ public abstract class AbstractTridentWindowManager<T> implements ITridentWindowM
public void shutdown() {
try {
- log.info("window manager [{}] is being shutdown", windowManager);
+ LOG.info("window manager [{}] is being shutdown", windowManager);
windowManager.shutdown();
} finally {
- log.info("window store [{}] is being shutdown", windowStore);
+ LOG.info("window store [{}] is being shutdown", windowStore);
windowStore.shutdown();
}
}
http://git-wip-us.apache.org/repos/asf/storm/blob/2494df61/storm-core/src/jvm/org/apache/storm/trident/windowing/InMemoryTridentWindowManager.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/org/apache/storm/trident/windowing/InMemoryTridentWindowManager.java b/storm-core/src/jvm/org/apache/storm/trident/windowing/InMemoryTridentWindowManager.java
index cbb30af..e47cc9a 100644
--- a/storm-core/src/jvm/org/apache/storm/trident/windowing/InMemoryTridentWindowManager.java
+++ b/storm-core/src/jvm/org/apache/storm/trident/windowing/InMemoryTridentWindowManager.java
@@ -33,7 +33,7 @@ import java.util.List;
* This {@code ITridentWindowManager} instance stores all the tuples and trigger related information inmemory.
*/
public class InMemoryTridentWindowManager extends AbstractTridentWindowManager<TridentTuple> {
- private static final Logger log = LoggerFactory.getLogger(InMemoryTridentWindowManager.class);
+ private static final Logger LOG = LoggerFactory.getLogger(InMemoryTridentWindowManager.class);
public InMemoryTridentWindowManager(WindowConfig windowConfig, String windowTaskId, WindowsStore windowStore, Aggregator aggregator,
BatchOutputCollector delegateCollector) {
@@ -42,7 +42,7 @@ public class InMemoryTridentWindowManager extends AbstractTridentWindowManager<T
@Override
protected void initialize() {
- log.debug("noop in initialize");
+ LOG.debug("noop in initialize");
}
@Override
@@ -52,17 +52,17 @@ public class InMemoryTridentWindowManager extends AbstractTridentWindowManager<T
@Override
public void onTuplesExpired(List<TridentTuple> expiredTuples) {
- log.debug("InMemoryTridentWindowManager.onTuplesExpired");
+ LOG.debug("InMemoryTridentWindowManager.onTuplesExpired");
}
public void addTuplesBatch(Object batchId, List<TridentTuple> tuples) {
// check if they are already added then ignore these tuples. This batch is replayed.
if (activeBatches.contains(getBatchTxnId(batchId))) {
- log.info("Ignoring already added tuples with batch: %s", batchId);
+ LOG.info("Ignoring already added tuples with batch: [{}]", batchId);
return;
}
- log.debug("Adding tuples to window-manager for batch: ", batchId);
+ LOG.debug("Adding tuples to window-manager for batch: [{}]", batchId);
for (TridentTuple tridentTuple : tuples) {
windowManager.add(tridentTuple);
}
http://git-wip-us.apache.org/repos/asf/storm/blob/2494df61/storm-core/src/jvm/org/apache/storm/trident/windowing/InMemoryWindowsStoreFactory.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/org/apache/storm/trident/windowing/InMemoryWindowsStoreFactory.java b/storm-core/src/jvm/org/apache/storm/trident/windowing/InMemoryWindowsStoreFactory.java
index 32027a9..cf65594 100644
--- a/storm-core/src/jvm/org/apache/storm/trident/windowing/InMemoryWindowsStoreFactory.java
+++ b/storm-core/src/jvm/org/apache/storm/trident/windowing/InMemoryWindowsStoreFactory.java
@@ -18,9 +18,18 @@
*/
package org.apache.storm.trident.windowing;
+import org.apache.storm.trident.operation.Aggregator;
+import org.apache.storm.trident.operation.TridentCollector;
+import org.apache.storm.trident.windowing.config.WindowConfig;
+import org.apache.storm.tuple.Fields;
+
+import java.util.List;
+
/**
- * InMemoryWindowsStoreFactory contains a single instance of {@code InMemoryWindowsStore} which will be used for
- * storing tuples and triggers of the window and successfully emitted triggers can be removed from {@code StateUpdater}.
+ * InMemoryWindowsStoreFactory contains a single instance of {@link InMemoryWindowsStore} which will be used for
+ * storing tuples and triggers of the window. The same InMemoryWindowsStoreFactory instance is passed to {@link WindowsStateUpdater},
+ * which removes successfully emitted triggers from the same {@code inMemoryWindowsStore} instance in
+ * {@link WindowsStateUpdater#updateState(WindowsState, List, TridentCollector)}.
*
*/
public class InMemoryWindowsStoreFactory implements WindowsStoreFactory {
http://git-wip-us.apache.org/repos/asf/storm/blob/2494df61/storm-core/src/jvm/org/apache/storm/trident/windowing/StoreBasedTridentWindowManager.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/org/apache/storm/trident/windowing/StoreBasedTridentWindowManager.java b/storm-core/src/jvm/org/apache/storm/trident/windowing/StoreBasedTridentWindowManager.java
index 885e508..58b24a2 100644
--- a/storm-core/src/jvm/org/apache/storm/trident/windowing/StoreBasedTridentWindowManager.java
+++ b/storm-core/src/jvm/org/apache/storm/trident/windowing/StoreBasedTridentWindowManager.java
@@ -40,7 +40,7 @@ import java.util.concurrent.atomic.AtomicLong;
*
*/
public class StoreBasedTridentWindowManager extends AbstractTridentWindowManager<TridentBatchTuple> {
- private static final Logger log = LoggerFactory.getLogger(StoreBasedTridentWindowManager.class);
+ private static final Logger LOG = LoggerFactory.getLogger(StoreBasedTridentWindowManager.class);
private static final String TUPLE_PREFIX = "tu" + WindowsStore.KEY_SEPARATOR;
@@ -75,14 +75,14 @@ public class StoreBasedTridentWindowManager extends AbstractTridentWindowManager
if (key.startsWith(windowTupleTaskId)) {
int tupleIndexValue = lastPart(key);
String batchId = secondLastPart(key);
- log.debug("Received tuple with batch [{}] and tuple index [{}]", batchId, tupleIndexValue);
+ LOG.debug("Received tuple with batch [{}] and tuple index [{}]", batchId, tupleIndexValue);
windowManager.add(new TridentBatchTuple(batchId, System.currentTimeMillis(), tupleIndexValue));
} else if (key.startsWith(windowTriggerTaskId)) {
triggerKeys.add(key);
- log.debug("Received trigger with key [{}]", key);
+ LOG.debug("Received trigger with key [{}]", key);
} else if(key.startsWith(windowTriggerInprocessId)) {
attemptedTriggerKeys.add(key);
- log.debug("Received earlier unsuccessful trigger [{}] from windows store [{}]", key);
+ LOG.debug("Received earlier unsuccessful trigger [{}] from windows store [{}]", key);
}
}
@@ -99,7 +99,7 @@ public class StoreBasedTridentWindowManager extends AbstractTridentWindowManager
for (Object triggerObject : triggerObjects) {
int id = lastPart(triggerKeys.get(i++));
if(!triggersToBeIgnored.contains(id)) {
- log.info("Adding pending trigger value [{}]", triggerObject);
+ LOG.info("Adding pending trigger value [{}]", triggerObject);
pendingTriggers.add(new TriggerResult(id, (List<List<Object>>) triggerObject));
}
}
@@ -131,11 +131,11 @@ public class StoreBasedTridentWindowManager extends AbstractTridentWindowManager
public void addTuplesBatch(Object batchId, List<TridentTuple> tuples) {
// check if they are already added then ignore these tuples. This batch is replayed.
if (activeBatches.contains(getBatchTxnId(batchId))) {
- log.info("Ignoring already added tuples with batch: %s", batchId);
+ LOG.info("Ignoring already added tuples with batch: [{}]", batchId);
return;
}
- log.debug("Adding tuples to window-manager for batch: ", batchId);
+ LOG.debug("Adding tuples to window-manager for batch: [{}]", batchId);
List<WindowsStore.Entry> entries = new ArrayList<>();
for (int i = 0; i < tuples.size(); i++) {
String key = keyOf(batchId);
http://git-wip-us.apache.org/repos/asf/storm/blob/2494df61/storm-core/src/jvm/org/apache/storm/trident/windowing/WindowTridentProcessor.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/org/apache/storm/trident/windowing/WindowTridentProcessor.java b/storm-core/src/jvm/org/apache/storm/trident/windowing/WindowTridentProcessor.java
index c2d9362..8898b13 100644
--- a/storm-core/src/jvm/org/apache/storm/trident/windowing/WindowTridentProcessor.java
+++ b/storm-core/src/jvm/org/apache/storm/trident/windowing/WindowTridentProcessor.java
@@ -46,7 +46,7 @@ import java.util.Queue;
*
*/
public class WindowTridentProcessor implements TridentProcessor {
- private static final Logger log = LoggerFactory.getLogger(WindowTridentProcessor.class);
+ private static final Logger LOG = LoggerFactory.getLogger(WindowTridentProcessor.class);
public static final String TRIGGER_INPROCESS_PREFIX = "tip" + WindowsStore.KEY_SEPARATOR;
public static final String TRIGGER_PREFIX = "tr" + WindowsStore.KEY_SEPARATOR;
@@ -127,8 +127,13 @@ public class WindowTridentProcessor implements TridentProcessor {
@Override
public void cleanup() {
- log.info("shutting down window manager");
- tridentWindowManager.shutdown();
+ LOG.info("shutting down window manager");
+ try {
+ tridentWindowManager.shutdown();
+ } catch (Exception ex) {
+ LOG.error("Error occurred while cleaning up window processor", ex);
+ throw ex;
+ }
}
@Override
@@ -150,7 +155,7 @@ public class WindowTridentProcessor implements TridentProcessor {
Object batchId = processorContext.batchId;
Object batchTxnId = getBatchTxnId(batchId);
- log.debug("Received finishBatch of : {} ", batchId);
+ LOG.debug("Received finishBatch of : [{}] ", batchId);
// get all the tuples in a batch and add it to trident-window-manager
List<TridentTuple> tuples = (List<TridentTuple>) processorContext.state[tridentContext.getStateIndex()];
tridentWindowManager.addTuplesBatch(batchId, tuples);
@@ -171,7 +176,7 @@ public class WindowTridentProcessor implements TridentProcessor {
if(triggerValues == null) {
pendingTriggerIds = new ArrayList<>();
Queue<StoreBasedTridentWindowManager.TriggerResult> pendingTriggers = tridentWindowManager.getPendingTriggers();
- log.debug("pending triggers at batch: {} and triggers.size: {} ", batchId, pendingTriggers.size());
+ LOG.debug("pending triggers at batch: [{}] and triggers.size: [{}] ", batchId, pendingTriggers.size());
try {
Iterator<StoreBasedTridentWindowManager.TriggerResult> pendingTriggersIter = pendingTriggers.iterator();
List<Object> values = new ArrayList<>();
http://git-wip-us.apache.org/repos/asf/storm/blob/2494df61/storm-core/src/jvm/org/apache/storm/trident/windowing/WindowsState.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/org/apache/storm/trident/windowing/WindowsState.java b/storm-core/src/jvm/org/apache/storm/trident/windowing/WindowsState.java
index 378d24f..faf73d6 100644
--- a/storm-core/src/jvm/org/apache/storm/trident/windowing/WindowsState.java
+++ b/storm-core/src/jvm/org/apache/storm/trident/windowing/WindowsState.java
@@ -28,7 +28,7 @@ import org.slf4j.LoggerFactory;
*
*/
public class WindowsState implements State {
- private static final Logger log = LoggerFactory.getLogger(WindowsState.class);
+ private static final Logger LOG = LoggerFactory.getLogger(WindowsState.class);
private Long currentTxId;
@@ -38,12 +38,12 @@ public class WindowsState implements State {
@Override
public void beginCommit(Long txId) {
currentTxId = txId;
- log.debug(" WindowsState.beginCommit:: [{}] ", txId);
+ LOG.debug(" WindowsState.beginCommit:: [{}] ", txId);
}
@Override
public void commit(Long txId) {
- log.debug("WindowsState.commit :: [{}]", txId);
+ LOG.debug("WindowsState.commit :: [{}]", txId);
}
public Long getCurrentTxId() {
http://git-wip-us.apache.org/repos/asf/storm/blob/2494df61/storm-core/src/jvm/org/apache/storm/trident/windowing/WindowsStateUpdater.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/org/apache/storm/trident/windowing/WindowsStateUpdater.java b/storm-core/src/jvm/org/apache/storm/trident/windowing/WindowsStateUpdater.java
index 45ac885..6664b41 100644
--- a/storm-core/src/jvm/org/apache/storm/trident/windowing/WindowsStateUpdater.java
+++ b/storm-core/src/jvm/org/apache/storm/trident/windowing/WindowsStateUpdater.java
@@ -36,7 +36,7 @@ import java.util.Map;
*/
public class WindowsStateUpdater implements StateUpdater<WindowsState> {
- private static final Logger log = LoggerFactory.getLogger(WindowsStateUpdater.class);
+ private static final Logger LOG = LoggerFactory.getLogger(WindowsStateUpdater.class);
private final WindowsStoreFactory windowStoreFactory;
private WindowsStore windowsStore;
@@ -48,7 +48,7 @@ public class WindowsStateUpdater implements StateUpdater<WindowsState> {
@Override
public void updateState(WindowsState state, List<TridentTuple> tuples, TridentCollector collector) {
Long currentTxId = state.getCurrentTxId();
- log.debug("Removing triggers using WindowStateUpdater, txnId: {} ", currentTxId);
+ LOG.debug("Removing triggers using WindowStateUpdater, txnId: [{}] ", currentTxId);
for (TridentTuple tuple : tuples) {
try {
Object fieldValue = tuple.getValueByField(WindowTridentProcessor.TRIGGER_FIELD_NAME);
@@ -58,11 +58,11 @@ public class WindowsStateUpdater implements StateUpdater<WindowsState> {
WindowTridentProcessor.TriggerInfo triggerInfo = (WindowTridentProcessor.TriggerInfo) fieldValue;
String triggerCompletedKey = WindowTridentProcessor.getWindowTriggerInprocessIdPrefix(triggerInfo.windowTaskId)+currentTxId;
- log.debug("Removing trigger key [{}] and trigger completed key [{}] from store: [{}]", triggerInfo, triggerCompletedKey, windowsStore);
+ LOG.debug("Removing trigger key [{}] and trigger completed key [{}] from store: [{}]", triggerInfo, triggerCompletedKey, windowsStore);
windowsStore.removeAll(Lists.newArrayList(triggerInfo.generateTriggerKey(), triggerCompletedKey));
} catch (Exception ex) {
- log.warn(ex.getMessage());
+ LOG.warn(ex.getMessage());
collector.reportError(ex);
throw new FailedException(ex);
}