You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@storm.apache.org by sr...@apache.org on 2016/03/28 08:07:18 UTC

[6/9] storm git commit: STORM-676 Addressed review comments

STORM-676 Addressed review comments


Project: http://git-wip-us.apache.org/repos/asf/storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/b08d7eaf
Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/b08d7eaf
Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/b08d7eaf

Branch: refs/heads/1.x-branch
Commit: b08d7eaf7099e4da74010501a189818ec11b00bc
Parents: 3a96f20
Author: Satish Duggana <sd...@hortonworks.com>
Authored: Wed Mar 23 19:03:55 2016 +0530
Committer: Satish Duggana <sd...@hortonworks.com>
Committed: Sun Mar 27 10:47:03 2016 +0530

----------------------------------------------------------------------
 .../trident/windowing/HBaseWindowsStore.java    |  2 +-
 .../windowing/HBaseWindowsStoreFactory.java     |  1 +
 .../jvm/org/apache/storm/trident/Stream.java    | 22 +++++++++++---------
 .../windowing/WindowTridentProcessor.java       |  2 +-
 4 files changed, 15 insertions(+), 12 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/storm/blob/b08d7eaf/external/storm-hbase/src/main/java/org/apache/storm/hbase/trident/windowing/HBaseWindowsStore.java
----------------------------------------------------------------------
diff --git a/external/storm-hbase/src/main/java/org/apache/storm/hbase/trident/windowing/HBaseWindowsStore.java b/external/storm-hbase/src/main/java/org/apache/storm/hbase/trident/windowing/HBaseWindowsStore.java
index ff3fbf9..b300ed6 100644
--- a/external/storm-hbase/src/main/java/org/apache/storm/hbase/trident/windowing/HBaseWindowsStore.java
+++ b/external/storm-hbase/src/main/java/org/apache/storm/hbase/trident/windowing/HBaseWindowsStore.java
@@ -55,7 +55,7 @@ public class HBaseWindowsStore implements WindowsStore {
     public static final String UTF_8 = "utf-8";
 
     private final ThreadLocal<HTable> threadLocalHtable;
-    private Queue<HTable> htables = new ConcurrentLinkedQueue<>();
+    private final Queue<HTable> htables = new ConcurrentLinkedQueue<>();
     private final byte[] family;
     private final byte[] qualifier;
 

http://git-wip-us.apache.org/repos/asf/storm/blob/b08d7eaf/external/storm-hbase/src/main/java/org/apache/storm/hbase/trident/windowing/HBaseWindowsStoreFactory.java
----------------------------------------------------------------------
diff --git a/external/storm-hbase/src/main/java/org/apache/storm/hbase/trident/windowing/HBaseWindowsStoreFactory.java b/external/storm-hbase/src/main/java/org/apache/storm/hbase/trident/windowing/HBaseWindowsStoreFactory.java
index a49bc87..a47d5fb 100644
--- a/external/storm-hbase/src/main/java/org/apache/storm/hbase/trident/windowing/HBaseWindowsStoreFactory.java
+++ b/external/storm-hbase/src/main/java/org/apache/storm/hbase/trident/windowing/HBaseWindowsStoreFactory.java
@@ -26,6 +26,7 @@ import org.apache.storm.trident.windowing.WindowsStoreFactory;
 import java.util.Map;
 
 /**
+ * Factory to create {@link HBaseWindowsStore} instances.
  *
  */
 public class HBaseWindowsStoreFactory implements WindowsStoreFactory {

http://git-wip-us.apache.org/repos/asf/storm/blob/b08d7eaf/storm-core/src/jvm/org/apache/storm/trident/Stream.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/org/apache/storm/trident/Stream.java b/storm-core/src/jvm/org/apache/storm/trident/Stream.java
index 444b42a..23ac34a 100644
--- a/storm-core/src/jvm/org/apache/storm/trident/Stream.java
+++ b/storm-core/src/jvm/org/apache/storm/trident/Stream.java
@@ -611,13 +611,13 @@ public class Stream implements IAggregatableStream, ResourceDeclarer<Stream> {
     /**
      * Returns a stream of tuples which are aggregated results of a tumbling window with every {@code windowCount} of tuples.
      *
-     * @param windowCount represents no of tuples in the window
+     * @param windowCount represents number of tuples in the window
      * @param windowStoreFactory intermediary tuple store for storing windowing tuples
      * @param inputFields projected fields for aggregator
      * @param aggregator aggregator to run on the window of tuples to compute the result and emit to the stream.
      * @param functionFields fields of values to emit with aggregation.
      *
-     * @return
+     * @return the new stream with this operation.
      */
     public Stream tumblingWindow(int windowCount, WindowsStoreFactory windowStoreFactory,
                                       Fields inputFields, Aggregator aggregator, Fields functionFields) {
@@ -626,7 +626,7 @@ public class Stream implements IAggregatableStream, ResourceDeclarer<Stream> {
 
     /**
      * Returns a stream of tuples which are aggregated results of a sliding window with every {@code windowCount} of tuples
-     * and slides the window with {@code slideCount}.
+     * and slides the window after {@code slideCount}.
      *
      * @param windowCount represents tuples count of a window
      * @param slideCount the number of tuples after which the window slides
@@ -635,7 +635,7 @@ public class Stream implements IAggregatableStream, ResourceDeclarer<Stream> {
      * @param aggregator aggregator to run on the window of tuples to compute the result and emit to the stream.
      * @param functionFields fields of values to emit with aggregation.
      *
-     * @return
+     * @return the new stream with this operation.
      */
     public Stream slidingWindow(int windowCount, int slideCount, WindowsStoreFactory windowStoreFactory,
                                      Fields inputFields, Aggregator aggregator, Fields functionFields) {
@@ -643,7 +643,7 @@ public class Stream implements IAggregatableStream, ResourceDeclarer<Stream> {
     }
 
     /**
-     * Returns a stream of tuples which are aggregated results of a window tumbles at duration of {@code windowDuration}
+     * Returns a stream of tuples which are aggregated results of a window that tumbles at duration of {@code windowDuration}
      *
      * @param windowDuration represents tumbling window duration configuration
      * @param windowStoreFactory intermediary tuple store for storing windowing tuples
@@ -651,7 +651,7 @@ public class Stream implements IAggregatableStream, ResourceDeclarer<Stream> {
      * @param aggregator aggregator to run on the window of tuples to compute the result and emit to the stream.
      * @param functionFields fields of values to emit with aggregation.
      *
-     * @return
+     * @return the new stream with this operation.
      */
     public Stream tumblingWindow(BaseWindowedBolt.Duration windowDuration, WindowsStoreFactory windowStoreFactory,
                                      Fields inputFields, Aggregator aggregator, Fields functionFields) {
@@ -659,7 +659,7 @@ public class Stream implements IAggregatableStream, ResourceDeclarer<Stream> {
     }
 
     /**
-     * Returns a stream of tuples which are aggregated results of a window which slides at duration of {@code slideDuration}
+     * Returns a stream of tuples which are aggregated results of a window which slides at duration of {@code slidingInterval}
      * and completes a window at {@code windowDuration}
      *
      * @param windowDuration represents window duration configuration
@@ -669,7 +669,7 @@ public class Stream implements IAggregatableStream, ResourceDeclarer<Stream> {
      * @param aggregator aggregator to run on the window of tuples to compute the result and emit to the stream.
      * @param functionFields fields of values to emit with aggregation.
      *
-     * @return
+     * @return the new stream with this operation.
      */
     public Stream slidingWindow(BaseWindowedBolt.Duration windowDuration, BaseWindowedBolt.Duration slidingInterval,
                                     WindowsStoreFactory windowStoreFactory, Fields inputFields, Aggregator aggregator, Fields functionFields) {
@@ -683,7 +683,8 @@ public class Stream implements IAggregatableStream, ResourceDeclarer<Stream> {
      * @param inputFields input fields
      * @param aggregator aggregator to run on the window of tuples to compute the result and emit to the stream.
      * @param functionFields fields of values to emit with aggregation.
-     * @return
+     *
+     * @return the new stream with this operation.
      */
     public Stream window(WindowConfig windowConfig, Fields inputFields, Aggregator aggregator, Fields functionFields) {
         // this store is used only for storing triggered aggregated results but not tuples as storeTuplesInStore is set
@@ -700,7 +701,8 @@ public class Stream implements IAggregatableStream, ResourceDeclarer<Stream> {
      * @param inputFields input fields
      * @param aggregator aggregator to run on the window of tuples to compute the result and emit to the stream.
      * @param functionFields fields of values to emit with aggregation.
-     * @return
+     *
+     * @return the new stream with this operation.
      */
     public Stream window(WindowConfig windowConfig, WindowsStoreFactory windowStoreFactory, Fields inputFields,
                          Aggregator aggregator, Fields functionFields) {

http://git-wip-us.apache.org/repos/asf/storm/blob/b08d7eaf/storm-core/src/jvm/org/apache/storm/trident/windowing/WindowTridentProcessor.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/org/apache/storm/trident/windowing/WindowTridentProcessor.java b/storm-core/src/jvm/org/apache/storm/trident/windowing/WindowTridentProcessor.java
index 8898b13..5125e41 100644
--- a/storm-core/src/jvm/org/apache/storm/trident/windowing/WindowTridentProcessor.java
+++ b/storm-core/src/jvm/org/apache/storm/trident/windowing/WindowTridentProcessor.java
@@ -53,7 +53,7 @@ public class WindowTridentProcessor implements TridentProcessor {
     public static final String TRIGGER_COUNT_PREFIX = "tc" + WindowsStore.KEY_SEPARATOR;
 
     public static final String TRIGGER_FIELD_NAME = "_task_info";
-    public static final long DEFAULT_INMEMORY_TUPLE_CACHE_LIMIT = 100l;
+    public static final long DEFAULT_INMEMORY_TUPLE_CACHE_LIMIT = 100L;
 
     private final String windowId;
     private final Fields inputFields;