You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@storm.apache.org by sr...@apache.org on 2016/03/28 07:11:27 UTC

[05/11] storm git commit: STORM-676 addressed review comments

STORM-676 addressed review comments


Project: http://git-wip-us.apache.org/repos/asf/storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/2494df61
Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/2494df61
Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/2494df61

Branch: refs/heads/master
Commit: 2494df61e2607d7ff8086605234f6f54b796e6af
Parents: 87a2d92
Author: Satish Duggana <sd...@hortonworks.com>
Authored: Sun Mar 13 10:25:21 2016 +0530
Committer: Satish Duggana <sd...@hortonworks.com>
Committed: Wed Mar 23 12:20:55 2016 +0530

----------------------------------------------------------------------
 .../trident/TridentHBaseWindowingStoreTopology.java  |  6 ++----
 .../TridentWindowingInmemoryStoreTopology.java       |  7 +++----
 .../hbase/trident/windowing/HBaseWindowsStore.java   |  6 +++---
 .../windowing/AbstractTridentWindowManager.java      | 14 +++++++-------
 .../windowing/InMemoryTridentWindowManager.java      | 10 +++++-----
 .../windowing/InMemoryWindowsStoreFactory.java       | 13 +++++++++++--
 .../windowing/StoreBasedTridentWindowManager.java    | 14 +++++++-------
 .../trident/windowing/WindowTridentProcessor.java    | 15 ++++++++++-----
 .../apache/storm/trident/windowing/WindowsState.java |  6 +++---
 .../storm/trident/windowing/WindowsStateUpdater.java |  8 ++++----
 10 files changed, 55 insertions(+), 44 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/storm/blob/2494df61/examples/storm-starter/src/jvm/org/apache/storm/starter/trident/TridentHBaseWindowingStoreTopology.java
----------------------------------------------------------------------
diff --git a/examples/storm-starter/src/jvm/org/apache/storm/starter/trident/TridentHBaseWindowingStoreTopology.java b/examples/storm-starter/src/jvm/org/apache/storm/starter/trident/TridentHBaseWindowingStoreTopology.java
index 24cc8d9..0ebaa1f 100644
--- a/examples/storm-starter/src/jvm/org/apache/storm/starter/trident/TridentHBaseWindowingStoreTopology.java
+++ b/examples/storm-starter/src/jvm/org/apache/storm/starter/trident/TridentHBaseWindowingStoreTopology.java
@@ -23,7 +23,6 @@ import org.apache.storm.LocalCluster;
 import org.apache.storm.StormSubmitter;
 import org.apache.storm.generated.StormTopology;
 import org.apache.storm.hbase.trident.windowing.HBaseWindowsStoreFactory;
-import org.apache.storm.topology.base.BaseWindowedBolt;
 import org.apache.storm.trident.Stream;
 import org.apache.storm.trident.TridentTopology;
 import org.apache.storm.trident.operation.BaseFunction;
@@ -44,13 +43,12 @@ import org.slf4j.LoggerFactory;
 
 import java.util.HashMap;
 import java.util.Map;
-import java.util.concurrent.TimeUnit;
 
 /**
  *
  */
 public class TridentHBaseWindowingStoreTopology {
-    private static final Logger log = LoggerFactory.getLogger(TridentHBaseWindowingStoreTopology.class);
+    private static final Logger LOG = LoggerFactory.getLogger(TridentHBaseWindowingStoreTopology.class);
 
     public static StormTopology buildTopology(WindowsStoreFactory windowsStore) throws Exception {
         FixedBatchSpout spout = new FixedBatchSpout(new Fields("sentence"), 3, new Values("the cow jumped over the moon"),
@@ -84,7 +82,7 @@ public class TridentHBaseWindowingStoreTopology {
 
         @Override
         public void execute(TridentTuple tuple, TridentCollector collector) {
-            log.info("##########Echo.execute: " + tuple);
+            LOG.info("##########Echo.execute: " + tuple);
             collector.emit(tuple.getValues());
         }
 

http://git-wip-us.apache.org/repos/asf/storm/blob/2494df61/examples/storm-starter/src/jvm/org/apache/storm/starter/trident/TridentWindowingInmemoryStoreTopology.java
----------------------------------------------------------------------
diff --git a/examples/storm-starter/src/jvm/org/apache/storm/starter/trident/TridentWindowingInmemoryStoreTopology.java b/examples/storm-starter/src/jvm/org/apache/storm/starter/trident/TridentWindowingInmemoryStoreTopology.java
index 5f0cb4f..a2455a0 100644
--- a/examples/storm-starter/src/jvm/org/apache/storm/starter/trident/TridentWindowingInmemoryStoreTopology.java
+++ b/examples/storm-starter/src/jvm/org/apache/storm/starter/trident/TridentWindowingInmemoryStoreTopology.java
@@ -25,7 +25,6 @@ import org.apache.storm.generated.StormTopology;
 import org.apache.storm.topology.base.BaseWindowedBolt;
 import org.apache.storm.trident.Stream;
 import org.apache.storm.trident.TridentTopology;
-import org.apache.storm.trident.operation.BaseAggregator;
 import org.apache.storm.trident.operation.BaseFunction;
 import org.apache.storm.trident.operation.Function;
 import org.apache.storm.trident.operation.TridentCollector;
@@ -53,10 +52,10 @@ import java.util.Map;
 import java.util.concurrent.TimeUnit;
 
 /**
- *
+ * Sample application of trident windowing which uses inmemory store for storing tuples in window.
  */
 public class TridentWindowingInmemoryStoreTopology {
-    private static final Logger log = LoggerFactory.getLogger(TridentWindowingInmemoryStoreTopology.class);
+    private static final Logger LOG = LoggerFactory.getLogger(TridentWindowingInmemoryStoreTopology.class);
 
     public static StormTopology buildTopology(WindowsStoreFactory windowStore, WindowConfig windowConfig) throws Exception {
         FixedBatchSpout spout = new FixedBatchSpout(new Fields("sentence"), 3, new Values("the cow jumped over the moon"),
@@ -91,7 +90,7 @@ public class TridentWindowingInmemoryStoreTopology {
 
         @Override
         public void execute(TridentTuple tuple, TridentCollector collector) {
-            log.info("##########Echo.execute: " + tuple);
+            LOG.info("##########Echo.execute: " + tuple);
             collector.emit(tuple.getValues());
         }
 

http://git-wip-us.apache.org/repos/asf/storm/blob/2494df61/external/storm-hbase/src/main/java/org/apache/storm/hbase/trident/windowing/HBaseWindowsStore.java
----------------------------------------------------------------------
diff --git a/external/storm-hbase/src/main/java/org/apache/storm/hbase/trident/windowing/HBaseWindowsStore.java b/external/storm-hbase/src/main/java/org/apache/storm/hbase/trident/windowing/HBaseWindowsStore.java
index 47879a4..ff3fbf9 100644
--- a/external/storm-hbase/src/main/java/org/apache/storm/hbase/trident/windowing/HBaseWindowsStore.java
+++ b/external/storm-hbase/src/main/java/org/apache/storm/hbase/trident/windowing/HBaseWindowsStore.java
@@ -51,7 +51,7 @@ import java.util.concurrent.ConcurrentLinkedQueue;
  *
  */
 public class HBaseWindowsStore implements WindowsStore {
-    private static final Logger log = LoggerFactory.getLogger(HBaseWindowsStore.class);
+    private static final Logger LOG = LoggerFactory.getLogger(HBaseWindowsStore.class);
     public static final String UTF_8 = "utf-8";
 
     private final ThreadLocal<HTable> threadLocalHtable;
@@ -136,7 +136,7 @@ public class HBaseWindowsStore implements WindowsStore {
         for (int i=0; i<results.length; i++) {
             Result result = results[i];
             if(result.isEmpty()) {
-                log.error("Got empty result for key [{}]", keys.get(i));
+                LOG.error("Got empty result for key [{}]", keys.get(i));
                 throw new RuntimeException("Received empty result for key: "+keys.get(i));
             }
             Input input = new Input(result.getValue(family, qualifier));
@@ -267,7 +267,7 @@ public class HBaseWindowsStore implements WindowsStore {
             try {
                 htable.close();
             } catch (IOException e) {
-                log.error(e.getMessage(), e);
+                LOG.error(e.getMessage(), e);
             }
         }
     }

http://git-wip-us.apache.org/repos/asf/storm/blob/2494df61/storm-core/src/jvm/org/apache/storm/trident/windowing/AbstractTridentWindowManager.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/org/apache/storm/trident/windowing/AbstractTridentWindowManager.java b/storm-core/src/jvm/org/apache/storm/trident/windowing/AbstractTridentWindowManager.java
index 2941e28..fd7a957 100644
--- a/storm-core/src/jvm/org/apache/storm/trident/windowing/AbstractTridentWindowManager.java
+++ b/storm-core/src/jvm/org/apache/storm/trident/windowing/AbstractTridentWindowManager.java
@@ -47,7 +47,7 @@ import java.util.concurrent.atomic.AtomicInteger;
  *
  */
 public abstract class AbstractTridentWindowManager<T> implements ITridentWindowManager {
-    private static final Logger log = LoggerFactory.getLogger(AbstractTridentWindowManager.class);
+    private static final Logger LOG = LoggerFactory.getLogger(AbstractTridentWindowManager.class);
 
     protected final WindowManager<T> windowManager;
     protected final Aggregator aggregator;
@@ -89,12 +89,12 @@ public abstract class AbstractTridentWindowManager<T> implements ITridentWindowM
     }
 
     private void preInitialize() {
-        log.debug("Getting current trigger count for this component/task");
+        LOG.debug("Getting current trigger count for this component/task");
         // get trigger count value from store
         Object result = windowStore.get(windowTriggerCountId);
         Integer currentCount = 0;
         if(result == null) {
-            log.info("No current trigger count in windows store.");
+            LOG.info("No current trigger count in windows store.");
         } else {
             currentCount = (Integer) result + 1;
         }
@@ -119,13 +119,13 @@ public abstract class AbstractTridentWindowManager<T> implements ITridentWindowM
 
         @Override
         public void onExpiry(List<T> expiredEvents) {
-            log.debug("onExpiry is invoked");
+            LOG.debug("onExpiry is invoked");
             onTuplesExpired(expiredEvents);
         }
 
         @Override
         public void onActivation(List<T> events, List<T> newEvents, List<T> expired) {
-            log.debug("onActivation is invoked with events size: {}", events.size());
+            LOG.debug("onActivation is invoked with events size: [{}]", events.size());
             // trigger occurred, create an aggregation and keep them in store
             int currentTriggerId = triggerId.incrementAndGet();
             execAggregatorAndStoreResult(currentTriggerId, events);
@@ -230,10 +230,10 @@ public abstract class AbstractTridentWindowManager<T> implements ITridentWindowM
 
     public void shutdown() {
         try {
-            log.info("window manager [{}] is being shutdown", windowManager);
+            LOG.info("window manager [{}] is being shutdown", windowManager);
             windowManager.shutdown();
         } finally {
-            log.info("window store [{}] is being shutdown", windowStore);
+            LOG.info("window store [{}] is being shutdown", windowStore);
             windowStore.shutdown();
         }
     }

http://git-wip-us.apache.org/repos/asf/storm/blob/2494df61/storm-core/src/jvm/org/apache/storm/trident/windowing/InMemoryTridentWindowManager.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/org/apache/storm/trident/windowing/InMemoryTridentWindowManager.java b/storm-core/src/jvm/org/apache/storm/trident/windowing/InMemoryTridentWindowManager.java
index cbb30af..e47cc9a 100644
--- a/storm-core/src/jvm/org/apache/storm/trident/windowing/InMemoryTridentWindowManager.java
+++ b/storm-core/src/jvm/org/apache/storm/trident/windowing/InMemoryTridentWindowManager.java
@@ -33,7 +33,7 @@ import java.util.List;
  * This {@code ITridentWindowManager} instance stores all the tuples and trigger related information inmemory.
  */
 public class InMemoryTridentWindowManager extends AbstractTridentWindowManager<TridentTuple> {
-    private static final Logger log = LoggerFactory.getLogger(InMemoryTridentWindowManager.class);
+    private static final Logger LOG = LoggerFactory.getLogger(InMemoryTridentWindowManager.class);
 
     public InMemoryTridentWindowManager(WindowConfig windowConfig, String windowTaskId, WindowsStore windowStore, Aggregator aggregator,
                                         BatchOutputCollector delegateCollector) {
@@ -42,7 +42,7 @@ public class InMemoryTridentWindowManager extends AbstractTridentWindowManager<T
 
     @Override
     protected void initialize() {
-        log.debug("noop in initialize");
+        LOG.debug("noop in initialize");
     }
 
     @Override
@@ -52,17 +52,17 @@ public class InMemoryTridentWindowManager extends AbstractTridentWindowManager<T
 
     @Override
     public void onTuplesExpired(List<TridentTuple> expiredTuples) {
-        log.debug("InMemoryTridentWindowManager.onTuplesExpired");
+        LOG.debug("InMemoryTridentWindowManager.onTuplesExpired");
     }
 
     public void addTuplesBatch(Object batchId, List<TridentTuple> tuples) {
         // check if they are already added then ignore these tuples. This batch is replayed.
         if (activeBatches.contains(getBatchTxnId(batchId))) {
-            log.info("Ignoring already added tuples with batch: %s", batchId);
+            LOG.info("Ignoring already added tuples with batch: [{}]", batchId);
             return;
         }
 
-        log.debug("Adding tuples to window-manager for batch: ", batchId);
+        LOG.debug("Adding tuples to window-manager for batch: [{}]", batchId);
         for (TridentTuple tridentTuple : tuples) {
             windowManager.add(tridentTuple);
         }

http://git-wip-us.apache.org/repos/asf/storm/blob/2494df61/storm-core/src/jvm/org/apache/storm/trident/windowing/InMemoryWindowsStoreFactory.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/org/apache/storm/trident/windowing/InMemoryWindowsStoreFactory.java b/storm-core/src/jvm/org/apache/storm/trident/windowing/InMemoryWindowsStoreFactory.java
index 32027a9..cf65594 100644
--- a/storm-core/src/jvm/org/apache/storm/trident/windowing/InMemoryWindowsStoreFactory.java
+++ b/storm-core/src/jvm/org/apache/storm/trident/windowing/InMemoryWindowsStoreFactory.java
@@ -18,9 +18,18 @@
  */
 package org.apache.storm.trident.windowing;
 
+import org.apache.storm.trident.operation.Aggregator;
+import org.apache.storm.trident.operation.TridentCollector;
+import org.apache.storm.trident.windowing.config.WindowConfig;
+import org.apache.storm.tuple.Fields;
+
+import java.util.List;
+
 /**
- * InMemoryWindowsStoreFactory contains a single instance of {@code InMemoryWindowsStore} which will be used for
- * storing tuples and triggers of the window and successfully emitted triggers can be removed from {@code StateUpdater}.
+ * InMemoryWindowsStoreFactory contains a single instance of {@link InMemoryWindowsStore} which will be used for
+ * storing tuples and triggers of the window. The same InMemoryWindowsStoreFactory instance is passed to {@link WindowsStateUpdater},
+ * which removes successfully emitted triggers from the same {@code inMemoryWindowsStore} instance in
+ * {@link WindowsStateUpdater#updateState(WindowsState, List, TridentCollector)}.
  *
  */
 public class InMemoryWindowsStoreFactory implements WindowsStoreFactory {

http://git-wip-us.apache.org/repos/asf/storm/blob/2494df61/storm-core/src/jvm/org/apache/storm/trident/windowing/StoreBasedTridentWindowManager.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/org/apache/storm/trident/windowing/StoreBasedTridentWindowManager.java b/storm-core/src/jvm/org/apache/storm/trident/windowing/StoreBasedTridentWindowManager.java
index 885e508..58b24a2 100644
--- a/storm-core/src/jvm/org/apache/storm/trident/windowing/StoreBasedTridentWindowManager.java
+++ b/storm-core/src/jvm/org/apache/storm/trident/windowing/StoreBasedTridentWindowManager.java
@@ -40,7 +40,7 @@ import java.util.concurrent.atomic.AtomicLong;
  *
  */
 public class StoreBasedTridentWindowManager extends AbstractTridentWindowManager<TridentBatchTuple> {
-    private static final Logger log = LoggerFactory.getLogger(StoreBasedTridentWindowManager.class);
+    private static final Logger LOG = LoggerFactory.getLogger(StoreBasedTridentWindowManager.class);
 
     private static final String TUPLE_PREFIX = "tu" + WindowsStore.KEY_SEPARATOR;
 
@@ -75,14 +75,14 @@ public class StoreBasedTridentWindowManager extends AbstractTridentWindowManager
             if (key.startsWith(windowTupleTaskId)) {
                 int tupleIndexValue = lastPart(key);
                 String batchId = secondLastPart(key);
-                log.debug("Received tuple with batch [{}] and tuple index [{}]", batchId, tupleIndexValue);
+                LOG.debug("Received tuple with batch [{}] and tuple index [{}]", batchId, tupleIndexValue);
                 windowManager.add(new TridentBatchTuple(batchId, System.currentTimeMillis(), tupleIndexValue));
             } else if (key.startsWith(windowTriggerTaskId)) {
                 triggerKeys.add(key);
-                log.debug("Received trigger with key [{}]", key);
+                LOG.debug("Received trigger with key [{}]", key);
             } else if(key.startsWith(windowTriggerInprocessId)) {
                 attemptedTriggerKeys.add(key);
-                log.debug("Received earlier unsuccessful trigger [{}] from windows store [{}]", key);
+                LOG.debug("Received earlier unsuccessful trigger [{}] from windows store [{}]", key);
             }
         }
 
@@ -99,7 +99,7 @@ public class StoreBasedTridentWindowManager extends AbstractTridentWindowManager
         for (Object triggerObject : triggerObjects) {
             int id = lastPart(triggerKeys.get(i++));
             if(!triggersToBeIgnored.contains(id)) {
-                log.info("Adding pending trigger value [{}]", triggerObject);
+                LOG.info("Adding pending trigger value [{}]", triggerObject);
                 pendingTriggers.add(new TriggerResult(id, (List<List<Object>>) triggerObject));
             }
         }
@@ -131,11 +131,11 @@ public class StoreBasedTridentWindowManager extends AbstractTridentWindowManager
     public void addTuplesBatch(Object batchId, List<TridentTuple> tuples) {
         // check if they are already added then ignore these tuples. This batch is replayed.
         if (activeBatches.contains(getBatchTxnId(batchId))) {
-            log.info("Ignoring already added tuples with batch: %s", batchId);
+            LOG.info("Ignoring already added tuples with batch: [{}]", batchId);
             return;
         }
 
-        log.debug("Adding tuples to window-manager for batch: ", batchId);
+        LOG.debug("Adding tuples to window-manager for batch: [{}]", batchId);
         List<WindowsStore.Entry> entries = new ArrayList<>();
         for (int i = 0; i < tuples.size(); i++) {
             String key = keyOf(batchId);

http://git-wip-us.apache.org/repos/asf/storm/blob/2494df61/storm-core/src/jvm/org/apache/storm/trident/windowing/WindowTridentProcessor.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/org/apache/storm/trident/windowing/WindowTridentProcessor.java b/storm-core/src/jvm/org/apache/storm/trident/windowing/WindowTridentProcessor.java
index c2d9362..8898b13 100644
--- a/storm-core/src/jvm/org/apache/storm/trident/windowing/WindowTridentProcessor.java
+++ b/storm-core/src/jvm/org/apache/storm/trident/windowing/WindowTridentProcessor.java
@@ -46,7 +46,7 @@ import java.util.Queue;
  *
  */
 public class WindowTridentProcessor implements TridentProcessor {
-    private static final Logger log = LoggerFactory.getLogger(WindowTridentProcessor.class);
+    private static final Logger LOG = LoggerFactory.getLogger(WindowTridentProcessor.class);
 
     public static final String TRIGGER_INPROCESS_PREFIX = "tip" + WindowsStore.KEY_SEPARATOR;
     public static final String TRIGGER_PREFIX = "tr" + WindowsStore.KEY_SEPARATOR;
@@ -127,8 +127,13 @@ public class WindowTridentProcessor implements TridentProcessor {
 
     @Override
     public void cleanup() {
-        log.info("shutting down window manager");
-        tridentWindowManager.shutdown();
+        LOG.info("shutting down window manager");
+        try {
+            tridentWindowManager.shutdown();
+        } catch (Exception ex) {
+            LOG.error("Error occurred while cleaning up window processor", ex);
+            throw ex;
+        }
     }
 
     @Override
@@ -150,7 +155,7 @@ public class WindowTridentProcessor implements TridentProcessor {
         Object batchId = processorContext.batchId;
         Object batchTxnId = getBatchTxnId(batchId);
 
-        log.debug("Received finishBatch of : {} ", batchId);
+        LOG.debug("Received finishBatch of : [{}] ", batchId);
         // get all the tuples in a batch and add it to trident-window-manager
         List<TridentTuple> tuples = (List<TridentTuple>) processorContext.state[tridentContext.getStateIndex()];
         tridentWindowManager.addTuplesBatch(batchId, tuples);
@@ -171,7 +176,7 @@ public class WindowTridentProcessor implements TridentProcessor {
         if(triggerValues == null) {
             pendingTriggerIds = new ArrayList<>();
             Queue<StoreBasedTridentWindowManager.TriggerResult> pendingTriggers = tridentWindowManager.getPendingTriggers();
-            log.debug("pending triggers at batch: {} and triggers.size: {} ", batchId, pendingTriggers.size());
+            LOG.debug("pending triggers at batch: [{}] and triggers.size: [{}] ", batchId, pendingTriggers.size());
             try {
                 Iterator<StoreBasedTridentWindowManager.TriggerResult> pendingTriggersIter = pendingTriggers.iterator();
                 List<Object> values = new ArrayList<>();

http://git-wip-us.apache.org/repos/asf/storm/blob/2494df61/storm-core/src/jvm/org/apache/storm/trident/windowing/WindowsState.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/org/apache/storm/trident/windowing/WindowsState.java b/storm-core/src/jvm/org/apache/storm/trident/windowing/WindowsState.java
index 378d24f..faf73d6 100644
--- a/storm-core/src/jvm/org/apache/storm/trident/windowing/WindowsState.java
+++ b/storm-core/src/jvm/org/apache/storm/trident/windowing/WindowsState.java
@@ -28,7 +28,7 @@ import org.slf4j.LoggerFactory;
  *
  */
 public class WindowsState implements State {
-    private static final Logger log = LoggerFactory.getLogger(WindowsState.class);
+    private static final Logger LOG = LoggerFactory.getLogger(WindowsState.class);
 
     private Long currentTxId;
 
@@ -38,12 +38,12 @@ public class WindowsState implements State {
     @Override
     public void beginCommit(Long txId) {
         currentTxId = txId;
-        log.debug(" WindowsState.beginCommit:: [{}] ", txId);
+        LOG.debug(" WindowsState.beginCommit:: [{}] ", txId);
     }
 
     @Override
     public void commit(Long txId) {
-        log.debug("WindowsState.commit :: [{}]", txId);
+        LOG.debug("WindowsState.commit :: [{}]", txId);
     }
 
     public Long getCurrentTxId() {

http://git-wip-us.apache.org/repos/asf/storm/blob/2494df61/storm-core/src/jvm/org/apache/storm/trident/windowing/WindowsStateUpdater.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/org/apache/storm/trident/windowing/WindowsStateUpdater.java b/storm-core/src/jvm/org/apache/storm/trident/windowing/WindowsStateUpdater.java
index 45ac885..6664b41 100644
--- a/storm-core/src/jvm/org/apache/storm/trident/windowing/WindowsStateUpdater.java
+++ b/storm-core/src/jvm/org/apache/storm/trident/windowing/WindowsStateUpdater.java
@@ -36,7 +36,7 @@ import java.util.Map;
  */
 public class WindowsStateUpdater implements StateUpdater<WindowsState> {
 
-    private static final Logger log = LoggerFactory.getLogger(WindowsStateUpdater.class);
+    private static final Logger LOG = LoggerFactory.getLogger(WindowsStateUpdater.class);
 
     private final WindowsStoreFactory windowStoreFactory;
     private WindowsStore windowsStore;
@@ -48,7 +48,7 @@ public class WindowsStateUpdater implements StateUpdater<WindowsState> {
     @Override
     public void updateState(WindowsState state, List<TridentTuple> tuples, TridentCollector collector) {
         Long currentTxId = state.getCurrentTxId();
-        log.debug("Removing triggers using WindowStateUpdater, txnId: {} ", currentTxId);
+        LOG.debug("Removing triggers using WindowStateUpdater, txnId: [{}] ", currentTxId);
         for (TridentTuple tuple : tuples) {
             try {
                 Object fieldValue = tuple.getValueByField(WindowTridentProcessor.TRIGGER_FIELD_NAME);
@@ -58,11 +58,11 @@ public class WindowsStateUpdater implements StateUpdater<WindowsState> {
                 WindowTridentProcessor.TriggerInfo triggerInfo = (WindowTridentProcessor.TriggerInfo) fieldValue;
                 String triggerCompletedKey = WindowTridentProcessor.getWindowTriggerInprocessIdPrefix(triggerInfo.windowTaskId)+currentTxId;
 
-                log.debug("Removing trigger key [{}] and trigger completed key [{}] from store: [{}]", triggerInfo, triggerCompletedKey, windowsStore);
+                LOG.debug("Removing trigger key [{}] and trigger completed key [{}] from store: [{}]", triggerInfo, triggerCompletedKey, windowsStore);
 
                 windowsStore.removeAll(Lists.newArrayList(triggerInfo.generateTriggerKey(), triggerCompletedKey));
             } catch (Exception ex) {
-                log.warn(ex.getMessage());
+                LOG.warn(ex.getMessage());
                 collector.reportError(ex);
                 throw new FailedException(ex);
             }