You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@storm.apache.org by sr...@apache.org on 2016/03/28 07:11:29 UTC

[07/11] storm git commit: STORM-676 Refactoring of WindowConfig APIs

STORM-676 Refactoring of WindowConfig APIs


Project: http://git-wip-us.apache.org/repos/asf/storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/1bac2a9a
Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/1bac2a9a
Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/1bac2a9a

Branch: refs/heads/master
Commit: 1bac2a9a080d94286cc3ef2d21fa507ca5140528
Parents: 5da6bff
Author: Satish Duggana <sd...@hortonworks.com>
Authored: Wed Mar 23 22:35:08 2016 +0530
Committer: Satish Duggana <sd...@hortonworks.com>
Committed: Wed Mar 23 22:35:08 2016 +0530

----------------------------------------------------------------------
 .../windowing/AbstractTridentWindowManager.java |  4 +-
 .../windowing/config/SlidingCountWindow.java    |  7 ++-
 .../windowing/config/SlidingDurationWindow.java |  6 +-
 .../windowing/config/TumblingCountWindow.java   |  8 ++-
 .../config/TumblingDurationWindow.java          |  6 +-
 .../trident/windowing/config/WindowConfig.java  |  4 +-
 .../strategy/WindowStrategyFactory.java         | 60 --------------------
 .../storm/trident/TridentWindowingTest.java     | 25 ++++----
 8 files changed, 33 insertions(+), 87 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/storm/blob/1bac2a9a/storm-core/src/jvm/org/apache/storm/trident/windowing/AbstractTridentWindowManager.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/org/apache/storm/trident/windowing/AbstractTridentWindowManager.java b/storm-core/src/jvm/org/apache/storm/trident/windowing/AbstractTridentWindowManager.java
index aac18d3..f93527a 100644
--- a/storm-core/src/jvm/org/apache/storm/trident/windowing/AbstractTridentWindowManager.java
+++ b/storm-core/src/jvm/org/apache/storm/trident/windowing/AbstractTridentWindowManager.java
@@ -25,7 +25,6 @@ import org.apache.storm.trident.operation.TridentCollector;
 import org.apache.storm.trident.tuple.TridentTuple;
 import org.apache.storm.trident.windowing.config.WindowConfig;
 import org.apache.storm.trident.windowing.strategy.WindowStrategy;
-import org.apache.storm.trident.windowing.strategy.WindowStrategyFactory;
 import org.apache.storm.windowing.EvictionPolicy;
 import org.apache.storm.windowing.TriggerPolicy;
 import org.apache.storm.windowing.WindowLifecycleListener;
@@ -34,7 +33,6 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import java.util.ArrayList;
-import java.util.HashSet;
 import java.util.List;
 import java.util.Queue;
 import java.util.Set;
@@ -71,7 +69,7 @@ public abstract class AbstractTridentWindowManager<T> implements ITridentWindowM
 
         windowManager = new WindowManager<>(new TridentWindowLifeCycleListener());
 
-        WindowStrategy<T> windowStrategy = WindowStrategyFactory.create(windowConfig);
+        WindowStrategy<T> windowStrategy = windowConfig.getWindowStrategy();
         EvictionPolicy<T> evictionPolicy = windowStrategy.getEvictionPolicy();
         windowManager.setEvictionPolicy(evictionPolicy);
         triggerPolicy = windowStrategy.getTriggerPolicy(windowManager, evictionPolicy);

http://git-wip-us.apache.org/repos/asf/storm/blob/1bac2a9a/storm-core/src/jvm/org/apache/storm/trident/windowing/config/SlidingCountWindow.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/org/apache/storm/trident/windowing/config/SlidingCountWindow.java b/storm-core/src/jvm/org/apache/storm/trident/windowing/config/SlidingCountWindow.java
index a0dd13c..2e2d388 100644
--- a/storm-core/src/jvm/org/apache/storm/trident/windowing/config/SlidingCountWindow.java
+++ b/storm-core/src/jvm/org/apache/storm/trident/windowing/config/SlidingCountWindow.java
@@ -18,6 +18,9 @@
  */
 package org.apache.storm.trident.windowing.config;
 
+import org.apache.storm.trident.windowing.strategy.SlidingCountWindowStrategy;
+import org.apache.storm.trident.windowing.strategy.WindowStrategy;
+
 /**
  * Represents configuration of sliding window based on count of events. Window of length {@code windowLength} slides
  * at every count of given {@code slideLength}
@@ -30,8 +33,8 @@ public final class SlidingCountWindow extends BaseWindowConfig {
     }
 
     @Override
-    public Type getWindowType() {
-        return Type.SLIDING_COUNT;
+    public <T> WindowStrategy<T> getWindowStrategy() {
+        return new SlidingCountWindowStrategy<>(this);
     }
 
     public static SlidingCountWindow of(int windowCount, int slidingCount) {

http://git-wip-us.apache.org/repos/asf/storm/blob/1bac2a9a/storm-core/src/jvm/org/apache/storm/trident/windowing/config/SlidingDurationWindow.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/org/apache/storm/trident/windowing/config/SlidingDurationWindow.java b/storm-core/src/jvm/org/apache/storm/trident/windowing/config/SlidingDurationWindow.java
index f2fe291..befd4e3 100644
--- a/storm-core/src/jvm/org/apache/storm/trident/windowing/config/SlidingDurationWindow.java
+++ b/storm-core/src/jvm/org/apache/storm/trident/windowing/config/SlidingDurationWindow.java
@@ -19,6 +19,8 @@
 package org.apache.storm.trident.windowing.config;
 
 import org.apache.storm.topology.base.BaseWindowedBolt;
+import org.apache.storm.trident.windowing.strategy.SlidingDurationWindowStrategy;
+import org.apache.storm.trident.windowing.strategy.WindowStrategy;
 
 /**
  * Represents configuration of sliding window based on duration. Window duration of {@code windowLength} slides
@@ -32,8 +34,8 @@ public final class SlidingDurationWindow extends BaseWindowConfig {
     }
 
     @Override
-    public Type getWindowType() {
-        return Type.SLIDING_DURATION;
+    public <T> WindowStrategy<T> getWindowStrategy() {
+        return new SlidingDurationWindowStrategy<>(this);
     }
 
     public static SlidingDurationWindow of(BaseWindowedBolt.Duration windowDuration, BaseWindowedBolt.Duration slidingDuration) {

http://git-wip-us.apache.org/repos/asf/storm/blob/1bac2a9a/storm-core/src/jvm/org/apache/storm/trident/windowing/config/TumblingCountWindow.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/org/apache/storm/trident/windowing/config/TumblingCountWindow.java b/storm-core/src/jvm/org/apache/storm/trident/windowing/config/TumblingCountWindow.java
index a5f3528..1988850 100644
--- a/storm-core/src/jvm/org/apache/storm/trident/windowing/config/TumblingCountWindow.java
+++ b/storm-core/src/jvm/org/apache/storm/trident/windowing/config/TumblingCountWindow.java
@@ -18,6 +18,10 @@
  */
 package org.apache.storm.trident.windowing.config;
 
+import org.apache.storm.trident.windowing.strategy.SlidingDurationWindowStrategy;
+import org.apache.storm.trident.windowing.strategy.TumblingCountWindowStrategy;
+import org.apache.storm.trident.windowing.strategy.WindowStrategy;
+
 /**
  * Represents tumbling count window configuration. Window tumbles at each given {@code windowLength} count of events.
  */
@@ -28,8 +32,8 @@ public final class TumblingCountWindow extends BaseWindowConfig {
     }
 
     @Override
-    public Type getWindowType() {
-        return Type.TUMBLING_COUNT;
+    public <T> WindowStrategy<T> getWindowStrategy() {
+        return new TumblingCountWindowStrategy<>(this);
     }
 
     public static TumblingCountWindow of(int windowLength) {

http://git-wip-us.apache.org/repos/asf/storm/blob/1bac2a9a/storm-core/src/jvm/org/apache/storm/trident/windowing/config/TumblingDurationWindow.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/org/apache/storm/trident/windowing/config/TumblingDurationWindow.java b/storm-core/src/jvm/org/apache/storm/trident/windowing/config/TumblingDurationWindow.java
index 8beb68d..3881a74 100644
--- a/storm-core/src/jvm/org/apache/storm/trident/windowing/config/TumblingDurationWindow.java
+++ b/storm-core/src/jvm/org/apache/storm/trident/windowing/config/TumblingDurationWindow.java
@@ -19,6 +19,8 @@
 package org.apache.storm.trident.windowing.config;
 
 import org.apache.storm.topology.base.BaseWindowedBolt;
+import org.apache.storm.trident.windowing.strategy.TumblingDurationWindowStrategy;
+import org.apache.storm.trident.windowing.strategy.WindowStrategy;
 
 /**
  * Represents tumbling duration window configuration. Window tumbles every given {@code windowLength} duration.
@@ -30,8 +32,8 @@ public final class TumblingDurationWindow extends BaseWindowConfig {
     }
 
     @Override
-    public Type getWindowType() {
-        return Type.TUMBLING_DURATION;
+    public <T> WindowStrategy<T> getWindowStrategy() {
+        return new TumblingDurationWindowStrategy<>(this);
     }
 
     public static TumblingDurationWindow of(BaseWindowedBolt.Duration windowLength) {

http://git-wip-us.apache.org/repos/asf/storm/blob/1bac2a9a/storm-core/src/jvm/org/apache/storm/trident/windowing/config/WindowConfig.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/org/apache/storm/trident/windowing/config/WindowConfig.java b/storm-core/src/jvm/org/apache/storm/trident/windowing/config/WindowConfig.java
index 49347e7..7cb78ee 100644
--- a/storm-core/src/jvm/org/apache/storm/trident/windowing/config/WindowConfig.java
+++ b/storm-core/src/jvm/org/apache/storm/trident/windowing/config/WindowConfig.java
@@ -18,6 +18,8 @@
  */
 package org.apache.storm.trident.windowing.config;
 
+import org.apache.storm.trident.windowing.strategy.WindowStrategy;
+
 import java.io.Serializable;
 
 /**
@@ -42,7 +44,7 @@ public interface WindowConfig extends Serializable {
      *
      * @return
      */
-    public Type getWindowType();
+    public <T> WindowStrategy<T> getWindowStrategy();
 
     public void validate();
 

http://git-wip-us.apache.org/repos/asf/storm/blob/1bac2a9a/storm-core/src/jvm/org/apache/storm/trident/windowing/strategy/WindowStrategyFactory.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/org/apache/storm/trident/windowing/strategy/WindowStrategyFactory.java b/storm-core/src/jvm/org/apache/storm/trident/windowing/strategy/WindowStrategyFactory.java
deleted file mode 100644
index d8a3918..0000000
--- a/storm-core/src/jvm/org/apache/storm/trident/windowing/strategy/WindowStrategyFactory.java
+++ /dev/null
@@ -1,60 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- * <p/>
- * http://www.apache.org/licenses/LICENSE-2.0
- * <p/>
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.storm.trident.windowing.strategy;
-
-import org.apache.storm.trident.windowing.config.WindowConfig;
-
-/**
- *
- */
-public final class WindowStrategyFactory {
-
-    private WindowStrategyFactory() {
-    }
-
-    /**
-     * Creates a {@code WindowStrategy} instance based on the given {@code windowConfig}.
-     *
-     * @param windowConfig
-     * @return
-     */
-    public static <T> WindowStrategy<T> create(WindowConfig windowConfig) {
-        WindowStrategy<T> windowStrategy = null;
-        WindowConfig.Type windowType = windowConfig.getWindowType();
-        switch(windowType) {
-            case SLIDING_COUNT:
-                windowStrategy = new SlidingCountWindowStrategy<>(windowConfig);
-                break;
-            case TUMBLING_COUNT:
-                windowStrategy = new TumblingCountWindowStrategy<>(windowConfig);
-                break;
-            case SLIDING_DURATION:
-                windowStrategy = new SlidingDurationWindowStrategy<>(windowConfig);
-                break;
-            case TUMBLING_DURATION:
-                windowStrategy = new TumblingDurationWindowStrategy<>(windowConfig);
-                break;
-            default:
-                throw new IllegalArgumentException("Given WindowConfig of type "+windowType+" is not supported");
-        }
-
-        return windowStrategy;
-    }
-
-}

http://git-wip-us.apache.org/repos/asf/storm/blob/1bac2a9a/storm-core/test/jvm/org/apache/storm/trident/TridentWindowingTest.java
----------------------------------------------------------------------
diff --git a/storm-core/test/jvm/org/apache/storm/trident/TridentWindowingTest.java b/storm-core/test/jvm/org/apache/storm/trident/TridentWindowingTest.java
index 03f298d..4b82b89 100644
--- a/storm-core/test/jvm/org/apache/storm/trident/TridentWindowingTest.java
+++ b/storm-core/test/jvm/org/apache/storm/trident/TridentWindowingTest.java
@@ -24,15 +24,8 @@ import org.apache.storm.trident.windowing.config.SlidingCountWindow;
 import org.apache.storm.trident.windowing.config.SlidingDurationWindow;
 import org.apache.storm.trident.windowing.config.TumblingCountWindow;
 import org.apache.storm.trident.windowing.config.TumblingDurationWindow;
-import org.apache.storm.trident.windowing.strategy.SlidingCountWindowStrategy;
-import org.apache.storm.trident.windowing.strategy.SlidingDurationWindowStrategy;
-import org.apache.storm.trident.windowing.strategy.TumblingCountWindowStrategy;
-import org.apache.storm.trident.windowing.strategy.TumblingDurationWindowStrategy;
-import org.apache.storm.trident.windowing.strategy.WindowStrategy;
-import org.apache.storm.trident.windowing.strategy.WindowStrategyFactory;
-import org.junit.After;
+import org.apache.storm.trident.windowing.strategy.*;
 import org.junit.Assert;
-import org.junit.Before;
 import org.junit.Test;
 
 import java.util.concurrent.TimeUnit;
@@ -45,19 +38,21 @@ public class TridentWindowingTest {
     @Test
     public void testWindowStrategyInstances() throws Exception {
 
-        WindowStrategy<Object> tumblingCountStrategy = WindowStrategyFactory.create(TumblingCountWindow.of(10));
+        WindowStrategy<Object> tumblingCountStrategy = TumblingCountWindow.of(10).getWindowStrategy();
         Assert.assertTrue(tumblingCountStrategy instanceof TumblingCountWindowStrategy);
 
-        WindowStrategy<Object> slidingCountStrategy = WindowStrategyFactory.create(SlidingCountWindow.of(100, 10));
+        WindowStrategy<Object> slidingCountStrategy = SlidingCountWindow.of(100, 10).getWindowStrategy();
         Assert.assertTrue(slidingCountStrategy instanceof SlidingCountWindowStrategy);
 
-        WindowStrategy<Object> tumblingDurationStrategy = WindowStrategyFactory.create(
-                TumblingDurationWindow.of(new BaseWindowedBolt.Duration(10, TimeUnit.SECONDS)));
+        WindowStrategy<Object> tumblingDurationStrategy = TumblingDurationWindow.of(
+                                                            new BaseWindowedBolt.Duration(10, TimeUnit.SECONDS))
+                                                            .getWindowStrategy();
         Assert.assertTrue(tumblingDurationStrategy instanceof TumblingDurationWindowStrategy);
 
-        WindowStrategy<Object> slidingDurationStrategy = WindowStrategyFactory.create(
-                SlidingDurationWindow.of(new BaseWindowedBolt.Duration(10, TimeUnit.SECONDS),
-                        new BaseWindowedBolt.Duration(2, TimeUnit.SECONDS)));
+        WindowStrategy<Object> slidingDurationStrategy = SlidingDurationWindow.of(
+                                                            new BaseWindowedBolt.Duration(10, TimeUnit.SECONDS),
+                                                            new BaseWindowedBolt.Duration(2, TimeUnit.SECONDS))
+                                                            .getWindowStrategy();
         Assert.assertTrue(slidingDurationStrategy instanceof SlidingDurationWindowStrategy);
     }