You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@storm.apache.org by sr...@apache.org on 2016/03/28 07:11:29 UTC
[07/11] storm git commit: STORM-676 Refactoring of WindowConfig APIs
STORM-676 Refactoring of WindowConfig APIs
Project: http://git-wip-us.apache.org/repos/asf/storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/1bac2a9a
Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/1bac2a9a
Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/1bac2a9a
Branch: refs/heads/master
Commit: 1bac2a9a080d94286cc3ef2d21fa507ca5140528
Parents: 5da6bff
Author: Satish Duggana <sd...@hortonworks.com>
Authored: Wed Mar 23 22:35:08 2016 +0530
Committer: Satish Duggana <sd...@hortonworks.com>
Committed: Wed Mar 23 22:35:08 2016 +0530
----------------------------------------------------------------------
.../windowing/AbstractTridentWindowManager.java | 4 +-
.../windowing/config/SlidingCountWindow.java | 7 ++-
.../windowing/config/SlidingDurationWindow.java | 6 +-
.../windowing/config/TumblingCountWindow.java | 8 ++-
.../config/TumblingDurationWindow.java | 6 +-
.../trident/windowing/config/WindowConfig.java | 4 +-
.../strategy/WindowStrategyFactory.java | 60 --------------------
.../storm/trident/TridentWindowingTest.java | 25 ++++----
8 files changed, 33 insertions(+), 87 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/storm/blob/1bac2a9a/storm-core/src/jvm/org/apache/storm/trident/windowing/AbstractTridentWindowManager.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/org/apache/storm/trident/windowing/AbstractTridentWindowManager.java b/storm-core/src/jvm/org/apache/storm/trident/windowing/AbstractTridentWindowManager.java
index aac18d3..f93527a 100644
--- a/storm-core/src/jvm/org/apache/storm/trident/windowing/AbstractTridentWindowManager.java
+++ b/storm-core/src/jvm/org/apache/storm/trident/windowing/AbstractTridentWindowManager.java
@@ -25,7 +25,6 @@ import org.apache.storm.trident.operation.TridentCollector;
import org.apache.storm.trident.tuple.TridentTuple;
import org.apache.storm.trident.windowing.config.WindowConfig;
import org.apache.storm.trident.windowing.strategy.WindowStrategy;
-import org.apache.storm.trident.windowing.strategy.WindowStrategyFactory;
import org.apache.storm.windowing.EvictionPolicy;
import org.apache.storm.windowing.TriggerPolicy;
import org.apache.storm.windowing.WindowLifecycleListener;
@@ -34,7 +33,6 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.ArrayList;
-import java.util.HashSet;
import java.util.List;
import java.util.Queue;
import java.util.Set;
@@ -71,7 +69,7 @@ public abstract class AbstractTridentWindowManager<T> implements ITridentWindowM
windowManager = new WindowManager<>(new TridentWindowLifeCycleListener());
- WindowStrategy<T> windowStrategy = WindowStrategyFactory.create(windowConfig);
+ WindowStrategy<T> windowStrategy = windowConfig.getWindowStrategy();
EvictionPolicy<T> evictionPolicy = windowStrategy.getEvictionPolicy();
windowManager.setEvictionPolicy(evictionPolicy);
triggerPolicy = windowStrategy.getTriggerPolicy(windowManager, evictionPolicy);
http://git-wip-us.apache.org/repos/asf/storm/blob/1bac2a9a/storm-core/src/jvm/org/apache/storm/trident/windowing/config/SlidingCountWindow.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/org/apache/storm/trident/windowing/config/SlidingCountWindow.java b/storm-core/src/jvm/org/apache/storm/trident/windowing/config/SlidingCountWindow.java
index a0dd13c..2e2d388 100644
--- a/storm-core/src/jvm/org/apache/storm/trident/windowing/config/SlidingCountWindow.java
+++ b/storm-core/src/jvm/org/apache/storm/trident/windowing/config/SlidingCountWindow.java
@@ -18,6 +18,9 @@
*/
package org.apache.storm.trident.windowing.config;
+import org.apache.storm.trident.windowing.strategy.SlidingCountWindowStrategy;
+import org.apache.storm.trident.windowing.strategy.WindowStrategy;
+
/**
* Represents configuration of sliding window based on count of events. Window of length {@code windowLength} slides
* at every count of given {@code slideLength}
@@ -30,8 +33,8 @@ public final class SlidingCountWindow extends BaseWindowConfig {
}
@Override
- public Type getWindowType() {
- return Type.SLIDING_COUNT;
+ public <T> WindowStrategy<T> getWindowStrategy() {
+ return new SlidingCountWindowStrategy<>(this);
}
public static SlidingCountWindow of(int windowCount, int slidingCount) {
http://git-wip-us.apache.org/repos/asf/storm/blob/1bac2a9a/storm-core/src/jvm/org/apache/storm/trident/windowing/config/SlidingDurationWindow.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/org/apache/storm/trident/windowing/config/SlidingDurationWindow.java b/storm-core/src/jvm/org/apache/storm/trident/windowing/config/SlidingDurationWindow.java
index f2fe291..befd4e3 100644
--- a/storm-core/src/jvm/org/apache/storm/trident/windowing/config/SlidingDurationWindow.java
+++ b/storm-core/src/jvm/org/apache/storm/trident/windowing/config/SlidingDurationWindow.java
@@ -19,6 +19,8 @@
package org.apache.storm.trident.windowing.config;
import org.apache.storm.topology.base.BaseWindowedBolt;
+import org.apache.storm.trident.windowing.strategy.SlidingDurationWindowStrategy;
+import org.apache.storm.trident.windowing.strategy.WindowStrategy;
/**
* Represents configuration of sliding window based on duration. Window duration of {@code windowLength} slides
@@ -32,8 +34,8 @@ public final class SlidingDurationWindow extends BaseWindowConfig {
}
@Override
- public Type getWindowType() {
- return Type.SLIDING_DURATION;
+ public <T> WindowStrategy<T> getWindowStrategy() {
+ return new SlidingDurationWindowStrategy<>(this);
}
public static SlidingDurationWindow of(BaseWindowedBolt.Duration windowDuration, BaseWindowedBolt.Duration slidingDuration) {
http://git-wip-us.apache.org/repos/asf/storm/blob/1bac2a9a/storm-core/src/jvm/org/apache/storm/trident/windowing/config/TumblingCountWindow.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/org/apache/storm/trident/windowing/config/TumblingCountWindow.java b/storm-core/src/jvm/org/apache/storm/trident/windowing/config/TumblingCountWindow.java
index a5f3528..1988850 100644
--- a/storm-core/src/jvm/org/apache/storm/trident/windowing/config/TumblingCountWindow.java
+++ b/storm-core/src/jvm/org/apache/storm/trident/windowing/config/TumblingCountWindow.java
@@ -18,6 +18,10 @@
*/
package org.apache.storm.trident.windowing.config;
+import org.apache.storm.trident.windowing.strategy.SlidingDurationWindowStrategy;
+import org.apache.storm.trident.windowing.strategy.TumblingCountWindowStrategy;
+import org.apache.storm.trident.windowing.strategy.WindowStrategy;
+
/**
* Represents tumbling count window configuration. Window tumbles at each given {@code windowLength} count of events.
*/
@@ -28,8 +32,8 @@ public final class TumblingCountWindow extends BaseWindowConfig {
}
@Override
- public Type getWindowType() {
- return Type.TUMBLING_COUNT;
+ public <T> WindowStrategy<T> getWindowStrategy() {
+ return new TumblingCountWindowStrategy<>(this);
}
public static TumblingCountWindow of(int windowLength) {
http://git-wip-us.apache.org/repos/asf/storm/blob/1bac2a9a/storm-core/src/jvm/org/apache/storm/trident/windowing/config/TumblingDurationWindow.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/org/apache/storm/trident/windowing/config/TumblingDurationWindow.java b/storm-core/src/jvm/org/apache/storm/trident/windowing/config/TumblingDurationWindow.java
index 8beb68d..3881a74 100644
--- a/storm-core/src/jvm/org/apache/storm/trident/windowing/config/TumblingDurationWindow.java
+++ b/storm-core/src/jvm/org/apache/storm/trident/windowing/config/TumblingDurationWindow.java
@@ -19,6 +19,8 @@
package org.apache.storm.trident.windowing.config;
import org.apache.storm.topology.base.BaseWindowedBolt;
+import org.apache.storm.trident.windowing.strategy.TumblingDurationWindowStrategy;
+import org.apache.storm.trident.windowing.strategy.WindowStrategy;
/**
* Represents tumbling duration window configuration. Window tumbles every given {@code windowLength} duration.
@@ -30,8 +32,8 @@ public final class TumblingDurationWindow extends BaseWindowConfig {
}
@Override
- public Type getWindowType() {
- return Type.TUMBLING_DURATION;
+ public <T> WindowStrategy<T> getWindowStrategy() {
+ return new TumblingDurationWindowStrategy<>(this);
}
public static TumblingDurationWindow of(BaseWindowedBolt.Duration windowLength) {
http://git-wip-us.apache.org/repos/asf/storm/blob/1bac2a9a/storm-core/src/jvm/org/apache/storm/trident/windowing/config/WindowConfig.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/org/apache/storm/trident/windowing/config/WindowConfig.java b/storm-core/src/jvm/org/apache/storm/trident/windowing/config/WindowConfig.java
index 49347e7..7cb78ee 100644
--- a/storm-core/src/jvm/org/apache/storm/trident/windowing/config/WindowConfig.java
+++ b/storm-core/src/jvm/org/apache/storm/trident/windowing/config/WindowConfig.java
@@ -18,6 +18,8 @@
*/
package org.apache.storm.trident.windowing.config;
+import org.apache.storm.trident.windowing.strategy.WindowStrategy;
+
import java.io.Serializable;
/**
@@ -42,7 +44,7 @@ public interface WindowConfig extends Serializable {
*
* @return
*/
- public Type getWindowType();
+ public <T> WindowStrategy<T> getWindowStrategy();
public void validate();
http://git-wip-us.apache.org/repos/asf/storm/blob/1bac2a9a/storm-core/src/jvm/org/apache/storm/trident/windowing/strategy/WindowStrategyFactory.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/org/apache/storm/trident/windowing/strategy/WindowStrategyFactory.java b/storm-core/src/jvm/org/apache/storm/trident/windowing/strategy/WindowStrategyFactory.java
deleted file mode 100644
index d8a3918..0000000
--- a/storm-core/src/jvm/org/apache/storm/trident/windowing/strategy/WindowStrategyFactory.java
+++ /dev/null
@@ -1,60 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- * <p/>
- * http://www.apache.org/licenses/LICENSE-2.0
- * <p/>
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.storm.trident.windowing.strategy;
-
-import org.apache.storm.trident.windowing.config.WindowConfig;
-
-/**
- *
- */
-public final class WindowStrategyFactory {
-
- private WindowStrategyFactory() {
- }
-
- /**
- * Creates a {@code WindowStrategy} instance based on the given {@code windowConfig}.
- *
- * @param windowConfig
- * @return
- */
- public static <T> WindowStrategy<T> create(WindowConfig windowConfig) {
- WindowStrategy<T> windowStrategy = null;
- WindowConfig.Type windowType = windowConfig.getWindowType();
- switch(windowType) {
- case SLIDING_COUNT:
- windowStrategy = new SlidingCountWindowStrategy<>(windowConfig);
- break;
- case TUMBLING_COUNT:
- windowStrategy = new TumblingCountWindowStrategy<>(windowConfig);
- break;
- case SLIDING_DURATION:
- windowStrategy = new SlidingDurationWindowStrategy<>(windowConfig);
- break;
- case TUMBLING_DURATION:
- windowStrategy = new TumblingDurationWindowStrategy<>(windowConfig);
- break;
- default:
- throw new IllegalArgumentException("Given WindowConfig of type "+windowType+" is not supported");
- }
-
- return windowStrategy;
- }
-
-}
http://git-wip-us.apache.org/repos/asf/storm/blob/1bac2a9a/storm-core/test/jvm/org/apache/storm/trident/TridentWindowingTest.java
----------------------------------------------------------------------
diff --git a/storm-core/test/jvm/org/apache/storm/trident/TridentWindowingTest.java b/storm-core/test/jvm/org/apache/storm/trident/TridentWindowingTest.java
index 03f298d..4b82b89 100644
--- a/storm-core/test/jvm/org/apache/storm/trident/TridentWindowingTest.java
+++ b/storm-core/test/jvm/org/apache/storm/trident/TridentWindowingTest.java
@@ -24,15 +24,8 @@ import org.apache.storm.trident.windowing.config.SlidingCountWindow;
import org.apache.storm.trident.windowing.config.SlidingDurationWindow;
import org.apache.storm.trident.windowing.config.TumblingCountWindow;
import org.apache.storm.trident.windowing.config.TumblingDurationWindow;
-import org.apache.storm.trident.windowing.strategy.SlidingCountWindowStrategy;
-import org.apache.storm.trident.windowing.strategy.SlidingDurationWindowStrategy;
-import org.apache.storm.trident.windowing.strategy.TumblingCountWindowStrategy;
-import org.apache.storm.trident.windowing.strategy.TumblingDurationWindowStrategy;
-import org.apache.storm.trident.windowing.strategy.WindowStrategy;
-import org.apache.storm.trident.windowing.strategy.WindowStrategyFactory;
-import org.junit.After;
+import org.apache.storm.trident.windowing.strategy.*;
import org.junit.Assert;
-import org.junit.Before;
import org.junit.Test;
import java.util.concurrent.TimeUnit;
@@ -45,19 +38,21 @@ public class TridentWindowingTest {
@Test
public void testWindowStrategyInstances() throws Exception {
- WindowStrategy<Object> tumblingCountStrategy = WindowStrategyFactory.create(TumblingCountWindow.of(10));
+ WindowStrategy<Object> tumblingCountStrategy = TumblingCountWindow.of(10).getWindowStrategy();
Assert.assertTrue(tumblingCountStrategy instanceof TumblingCountWindowStrategy);
- WindowStrategy<Object> slidingCountStrategy = WindowStrategyFactory.create(SlidingCountWindow.of(100, 10));
+ WindowStrategy<Object> slidingCountStrategy = SlidingCountWindow.of(100, 10).getWindowStrategy();
Assert.assertTrue(slidingCountStrategy instanceof SlidingCountWindowStrategy);
- WindowStrategy<Object> tumblingDurationStrategy = WindowStrategyFactory.create(
- TumblingDurationWindow.of(new BaseWindowedBolt.Duration(10, TimeUnit.SECONDS)));
+ WindowStrategy<Object> tumblingDurationStrategy = TumblingDurationWindow.of(
+ new BaseWindowedBolt.Duration(10, TimeUnit.SECONDS))
+ .getWindowStrategy();
Assert.assertTrue(tumblingDurationStrategy instanceof TumblingDurationWindowStrategy);
- WindowStrategy<Object> slidingDurationStrategy = WindowStrategyFactory.create(
- SlidingDurationWindow.of(new BaseWindowedBolt.Duration(10, TimeUnit.SECONDS),
- new BaseWindowedBolt.Duration(2, TimeUnit.SECONDS)));
+ WindowStrategy<Object> slidingDurationStrategy = SlidingDurationWindow.of(
+ new BaseWindowedBolt.Duration(10, TimeUnit.SECONDS),
+ new BaseWindowedBolt.Duration(2, TimeUnit.SECONDS))
+ .getWindowStrategy();
Assert.assertTrue(slidingDurationStrategy instanceof SlidingDurationWindowStrategy);
}