You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@heron.apache.org by nw...@apache.org on 2018/12/18 22:28:42 UTC
[incubator-heron] branch master updated: Refactor window operations
to avoid WindowConfigImpl cast (#3123)
This is an automated email from the ASF dual-hosted git repository.
nwang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-heron.git
The following commit(s) were added to refs/heads/master by this push:
new 6d56a09 Refactor window operations to avoid WindowConfigImpl cast (#3123)
6d56a09 is described below
commit 6d56a099f1a6c1fc19086c1145f0e7bb42955e5f
Author: Ning Wang <nw...@twitter.com>
AuthorDate: Tue Dec 18 14:28:36 2018 -0800
Refactor window operations to avoid WindowConfigImpl cast (#3123)
* Refactor window operations to avoid WindowConfigImpl cast
---
.../org/apache/heron/streamlet/WindowConfig.java | 29 ++++++--
.../heron/streamlet/impl/WindowConfigImpl.java | 77 ----------------------
.../GeneralReduceByKeyAndWindowOperator.java | 5 +-
.../GeneralReduceByKeyAndWindowStreamlet.java | 7 +-
.../streamlet/impl/streamlets/JoinStreamlet.java | 7 +-
.../streamlets/ReduceByKeyAndWindowStreamlet.java | 7 +-
.../impl/windowings/CountWindowConfig.java | 49 ++++++++++++++
.../impl/windowings/CustomWindowConfig.java | 53 +++++++++++++++
.../impl/windowings/TimeWindowConfig.java | 50 ++++++++++++++
9 files changed, 187 insertions(+), 97 deletions(-)
diff --git a/heron/api/src/java/org/apache/heron/streamlet/WindowConfig.java b/heron/api/src/java/org/apache/heron/streamlet/WindowConfig.java
index 1e32dc3..800321c 100644
--- a/heron/api/src/java/org/apache/heron/streamlet/WindowConfig.java
+++ b/heron/api/src/java/org/apache/heron/streamlet/WindowConfig.java
@@ -23,10 +23,13 @@ package org.apache.heron.streamlet;
import java.time.Duration;
+import org.apache.heron.api.bolt.BaseWindowedBolt;
import org.apache.heron.api.tuple.Tuple;
import org.apache.heron.api.windowing.EvictionPolicy;
import org.apache.heron.api.windowing.TriggerPolicy;
-import org.apache.heron.streamlet.impl.WindowConfigImpl;
+import org.apache.heron.streamlet.impl.windowings.CountWindowConfig;
+import org.apache.heron.streamlet.impl.windowings.CustomWindowConfig;
+import org.apache.heron.streamlet.impl.windowings.TimeWindowConfig;
/**
* WindowConfig allows Streamlet API users to program window configuration for operations
@@ -34,13 +37,27 @@ import org.apache.heron.streamlet.impl.WindowConfigImpl;
* sliding/tumbling windows.
*/
public interface WindowConfig {
+
+ /**
+ * Apply this WindowConfig object to a bolt object
+ * @param bolt the target bolt object
+ */
+ void applyTo(BaseWindowedBolt bolt);
+
+ /**
+ * This is just a dummy function to avoid WindowConfig objects to be matched with Java functional interface
+ * and cause ambiguous reference compiling error. In case new virtual functions are needed in WindowConfig,
+ * this dummy function can be safely removed.
+ */
+ void Dummy();
+
/**
* Creates a time based tumbling window of windowDuration
* @param windowDuration the duration of the tumbling window
* @return WindowConfig that can be passed to the transformation
*/
static WindowConfig TumblingTimeWindow(Duration windowDuration) {
- return new WindowConfigImpl(windowDuration, windowDuration);
+ return new TimeWindowConfig(windowDuration, windowDuration);
}
/**
@@ -51,7 +68,7 @@ public interface WindowConfig {
* @return WindowConfig that can be passed to the transformation
*/
static WindowConfig SlidingTimeWindow(Duration windowDuration, Duration slideInterval) {
- return new WindowConfigImpl(windowDuration, slideInterval);
+ return new TimeWindowConfig(windowDuration, slideInterval);
}
/**
@@ -60,7 +77,7 @@ public interface WindowConfig {
* @return WindowConfig that can be passed to the transformation
*/
static WindowConfig TumblingCountWindow(int windowSize) {
- return new WindowConfigImpl(windowSize, windowSize);
+ return new CountWindowConfig(windowSize, windowSize);
}
/**
@@ -71,7 +88,7 @@ public interface WindowConfig {
* @return WindowConfig that can be passed to the transformation
*/
static WindowConfig SlidingCountWindow(int windowSize, int slideSize) {
- return new WindowConfigImpl(windowSize, slideSize);
+ return new CountWindowConfig(windowSize, slideSize);
}
/**
@@ -82,6 +99,6 @@ public interface WindowConfig {
*/
static WindowConfig CustomWindow(TriggerPolicy<Tuple, ?> triggerPolicy,
EvictionPolicy<Tuple, ?> evictionPolicy) {
- return new WindowConfigImpl(triggerPolicy, evictionPolicy);
+ return new CustomWindowConfig(triggerPolicy, evictionPolicy);
}
}
diff --git a/heron/api/src/java/org/apache/heron/streamlet/impl/WindowConfigImpl.java b/heron/api/src/java/org/apache/heron/streamlet/impl/WindowConfigImpl.java
deleted file mode 100644
index 09c29f5..0000000
--- a/heron/api/src/java/org/apache/heron/streamlet/impl/WindowConfigImpl.java
+++ /dev/null
@@ -1,77 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.heron.streamlet.impl;
-
-import java.time.Duration;
-
-import org.apache.heron.api.bolt.BaseWindowedBolt;
-import org.apache.heron.api.tuple.Tuple;
-import org.apache.heron.api.windowing.EvictionPolicy;
-import org.apache.heron.api.windowing.TriggerPolicy;
-import org.apache.heron.streamlet.WindowConfig;
-
-/**
- * WindowConfigImpl implements the WindowConfig interface.
- */
-public final class WindowConfigImpl implements WindowConfig {
- private enum WindowType { TIME, COUNT, CUSTOM }
- private WindowType windowType;
- private int windowSize;
- private int slideInterval;
- private Duration windowDuration;
- private Duration slidingIntervalDuration;
- private TriggerPolicy<Tuple, ?> triggerPolicy;
- private EvictionPolicy<Tuple, ?> evictionPolicy;
-
- public WindowConfigImpl(Duration windowDuration, Duration slidingIntervalDuration) {
- this.windowType = WindowType.TIME;
- this.windowDuration = windowDuration;
- this.slidingIntervalDuration = slidingIntervalDuration;
- }
- public WindowConfigImpl(int windowSize, int slideInterval) {
- this.windowType = WindowType.COUNT;
- this.windowSize = windowSize;
- this.slideInterval = slideInterval;
- }
- public WindowConfigImpl(TriggerPolicy<Tuple, ?> triggerPolicy,
- EvictionPolicy<Tuple, ?> evictionPolicy) {
- this.windowType = WindowType.CUSTOM;
- this.triggerPolicy = triggerPolicy;
- this.evictionPolicy = evictionPolicy;
- }
-
- public void attachWindowConfig(BaseWindowedBolt bolt) {
- switch(windowType) {
- case COUNT:
- bolt.withWindow(BaseWindowedBolt.Count.of(windowSize),
- BaseWindowedBolt.Count.of(slideInterval));
- break;
- case TIME:
- bolt.withWindow(windowDuration, slidingIntervalDuration);
- break;
- case CUSTOM:
- bolt.withCustomEvictor(evictionPolicy);
- bolt.withCustomTrigger(triggerPolicy);
- break;
- default:
- throw new RuntimeException("Unknown windowType " + String.valueOf(windowType));
- }
- }
-}
diff --git a/heron/api/src/java/org/apache/heron/streamlet/impl/operators/GeneralReduceByKeyAndWindowOperator.java b/heron/api/src/java/org/apache/heron/streamlet/impl/operators/GeneralReduceByKeyAndWindowOperator.java
index fe6d458..fbdc4db 100644
--- a/heron/api/src/java/org/apache/heron/streamlet/impl/operators/GeneralReduceByKeyAndWindowOperator.java
+++ b/heron/api/src/java/org/apache/heron/streamlet/impl/operators/GeneralReduceByKeyAndWindowOperator.java
@@ -43,8 +43,9 @@ public class GeneralReduceByKeyAndWindowOperator<K, V, VR> extends StreamletWind
private VR identity;
private SerializableBiFunction<VR, V, ? extends VR> reduceFn;
- public GeneralReduceByKeyAndWindowOperator(SerializableFunction<V, K> keyExtractor, VR identity,
- SerializableBiFunction<VR, V, ? extends VR> reduceFn) {
+ public GeneralReduceByKeyAndWindowOperator(SerializableFunction<V, K> keyExtractor,
+ VR identity,
+ SerializableBiFunction<VR, V, ? extends VR> reduceFn) {
this.keyExtractor = keyExtractor;
this.identity = identity;
this.reduceFn = reduceFn;
diff --git a/heron/api/src/java/org/apache/heron/streamlet/impl/streamlets/GeneralReduceByKeyAndWindowStreamlet.java b/heron/api/src/java/org/apache/heron/streamlet/impl/streamlets/GeneralReduceByKeyAndWindowStreamlet.java
index 3f0abd8..b66956f 100644
--- a/heron/api/src/java/org/apache/heron/streamlet/impl/streamlets/GeneralReduceByKeyAndWindowStreamlet.java
+++ b/heron/api/src/java/org/apache/heron/streamlet/impl/streamlets/GeneralReduceByKeyAndWindowStreamlet.java
@@ -29,7 +29,6 @@ import org.apache.heron.streamlet.SerializableBiFunction;
import org.apache.heron.streamlet.SerializableFunction;
import org.apache.heron.streamlet.WindowConfig;
import org.apache.heron.streamlet.impl.StreamletImpl;
-import org.apache.heron.streamlet.impl.WindowConfigImpl;
import org.apache.heron.streamlet.impl.groupings.ReduceByKeyAndWindowCustomGrouping;
import org.apache.heron.streamlet.impl.operators.GeneralReduceByKeyAndWindowOperator;
@@ -44,7 +43,7 @@ public class GeneralReduceByKeyAndWindowStreamlet<K, V, VR>
extends StreamletImpl<KeyValue<KeyedWindow<K>, VR>> {
private StreamletImpl<V> parent;
private SerializableFunction<V, K> keyExtractor;
- private WindowConfigImpl windowCfg;
+ private WindowConfig windowCfg;
private VR identity;
private SerializableBiFunction<VR, V, ? extends VR> reduceFn;
@@ -55,7 +54,7 @@ public class GeneralReduceByKeyAndWindowStreamlet<K, V, VR>
SerializableBiFunction<VR, V, ? extends VR> reduceFn) {
this.parent = parent;
this.keyExtractor = keyExtractor;
- this.windowCfg = (WindowConfigImpl) windowCfg;
+ this.windowCfg = windowCfg;
this.identity = identity;
this.reduceFn = reduceFn;
setNumPartitions(parent.getNumPartitions());
@@ -66,7 +65,7 @@ public class GeneralReduceByKeyAndWindowStreamlet<K, V, VR>
setDefaultNameIfNone(StreamletNamePrefix.REDUCE, stageNames);
GeneralReduceByKeyAndWindowOperator<K, V, VR> bolt =
new GeneralReduceByKeyAndWindowOperator<K, V, VR>(keyExtractor, identity, reduceFn);
- windowCfg.attachWindowConfig(bolt);
+ windowCfg.applyTo(bolt);
bldr.setBolt(getName(), bolt, getNumPartitions())
.customGrouping(parent.getName(), parent.getStreamId(),
new ReduceByKeyAndWindowCustomGrouping<K, V>(keyExtractor));
diff --git a/heron/api/src/java/org/apache/heron/streamlet/impl/streamlets/JoinStreamlet.java b/heron/api/src/java/org/apache/heron/streamlet/impl/streamlets/JoinStreamlet.java
index 89c6de9..3f3effb 100644
--- a/heron/api/src/java/org/apache/heron/streamlet/impl/streamlets/JoinStreamlet.java
+++ b/heron/api/src/java/org/apache/heron/streamlet/impl/streamlets/JoinStreamlet.java
@@ -30,7 +30,6 @@ import org.apache.heron.streamlet.SerializableBiFunction;
import org.apache.heron.streamlet.SerializableFunction;
import org.apache.heron.streamlet.WindowConfig;
import org.apache.heron.streamlet.impl.StreamletImpl;
-import org.apache.heron.streamlet.impl.WindowConfigImpl;
import org.apache.heron.streamlet.impl.groupings.JoinCustomGrouping;
import org.apache.heron.streamlet.impl.operators.JoinOperator;
@@ -47,7 +46,7 @@ public final class JoinStreamlet<K, R, S, T> extends StreamletImpl<KeyValue<Keye
private StreamletImpl<S> right;
private SerializableFunction<R, K> leftKeyExtractor;
private SerializableFunction<S, K> rightKeyExtractor;
- private WindowConfigImpl windowCfg;
+ private WindowConfig windowCfg;
private SerializableBiFunction<R, S, ? extends T> joinFn;
public static <A, B, C, D> JoinStreamlet<A, B, C, D>
@@ -73,7 +72,7 @@ public final class JoinStreamlet<K, R, S, T> extends StreamletImpl<KeyValue<Keye
this.right = right;
this.leftKeyExtractor = leftKeyExtractor;
this.rightKeyExtractor = rightKeyExtractor;
- this.windowCfg = (WindowConfigImpl) windowCfg;
+ this.windowCfg = windowCfg;
this.joinFn = joinFn;
setNumPartitions(left.getNumPartitions());
}
@@ -90,7 +89,7 @@ public final class JoinStreamlet<K, R, S, T> extends StreamletImpl<KeyValue<Keye
setDefaultNameIfNone(StreamletNamePrefix.JOIN, stageNames);
JoinOperator<K, R, S, T> bolt = new JoinOperator<>(joinType, left.getName(),
right.getName(), leftKeyExtractor, rightKeyExtractor, joinFn);
- windowCfg.attachWindowConfig(bolt);
+ windowCfg.applyTo(bolt);
bldr.setBolt(getName(), bolt, getNumPartitions())
.customGrouping(left.getName(), left.getStreamId(),
new JoinCustomGrouping<K, R>(leftKeyExtractor))
diff --git a/heron/api/src/java/org/apache/heron/streamlet/impl/streamlets/ReduceByKeyAndWindowStreamlet.java b/heron/api/src/java/org/apache/heron/streamlet/impl/streamlets/ReduceByKeyAndWindowStreamlet.java
index 7e7a7e1..24052a8 100644
--- a/heron/api/src/java/org/apache/heron/streamlet/impl/streamlets/ReduceByKeyAndWindowStreamlet.java
+++ b/heron/api/src/java/org/apache/heron/streamlet/impl/streamlets/ReduceByKeyAndWindowStreamlet.java
@@ -29,7 +29,6 @@ import org.apache.heron.streamlet.SerializableBinaryOperator;
import org.apache.heron.streamlet.SerializableFunction;
import org.apache.heron.streamlet.WindowConfig;
import org.apache.heron.streamlet.impl.StreamletImpl;
-import org.apache.heron.streamlet.impl.WindowConfigImpl;
import org.apache.heron.streamlet.impl.groupings.ReduceByKeyAndWindowCustomGrouping;
import org.apache.heron.streamlet.impl.operators.ReduceByKeyAndWindowOperator;
@@ -45,7 +44,7 @@ public class ReduceByKeyAndWindowStreamlet<K, V, R>
private StreamletImpl<R> parent;
private SerializableFunction<R, K> keyExtractor;
private SerializableFunction<R, V> valueExtractor;
- private WindowConfigImpl windowCfg;
+ private WindowConfig windowCfg;
private SerializableBinaryOperator<V> reduceFn;
public ReduceByKeyAndWindowStreamlet(StreamletImpl<R> parent,
@@ -56,7 +55,7 @@ public class ReduceByKeyAndWindowStreamlet<K, V, R>
this.parent = parent;
this.keyExtractor = keyExtractor;
this.valueExtractor = valueExtractor;
- this.windowCfg = (WindowConfigImpl) windowCfg;
+ this.windowCfg = windowCfg;
this.reduceFn = reduceFn;
setNumPartitions(parent.getNumPartitions());
}
@@ -66,7 +65,7 @@ public class ReduceByKeyAndWindowStreamlet<K, V, R>
setDefaultNameIfNone(StreamletNamePrefix.REDUCE, stageNames);
ReduceByKeyAndWindowOperator<K, V, R> bolt = new ReduceByKeyAndWindowOperator<>(keyExtractor,
valueExtractor, reduceFn);
- windowCfg.attachWindowConfig(bolt);
+ windowCfg.applyTo(bolt);
bldr.setBolt(getName(), bolt, getNumPartitions())
.customGrouping(parent.getName(), parent.getStreamId(),
new ReduceByKeyAndWindowCustomGrouping<K, R>(keyExtractor));
diff --git a/heron/api/src/java/org/apache/heron/streamlet/impl/windowings/CountWindowConfig.java b/heron/api/src/java/org/apache/heron/streamlet/impl/windowings/CountWindowConfig.java
new file mode 100644
index 0000000..ebe67d9
--- /dev/null
+++ b/heron/api/src/java/org/apache/heron/streamlet/impl/windowings/CountWindowConfig.java
@@ -0,0 +1,49 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.heron.streamlet.impl.windowings;
+
+import org.apache.heron.api.bolt.BaseWindowedBolt;
+import org.apache.heron.streamlet.WindowConfig;
+
+/**
+ * CountWindowConfig implements a count based WindowConfig.
+ */
+public final class CountWindowConfig implements WindowConfig {
+ private int windowSize;
+ private int slideInterval;
+
+ public CountWindowConfig(int windowSize, int slideInterval) {
+ this.windowSize = windowSize;
+ this.slideInterval = slideInterval;
+ }
+
+ /**
+ * Apply this WindowConfig object to a bolt object
+ * @param bolt the target bolt object
+ */
+ @Override
+ public void applyTo(BaseWindowedBolt bolt) {
+ bolt.withWindow(BaseWindowedBolt.Count.of(windowSize),
+ BaseWindowedBolt.Count.of(slideInterval));
+ }
+
+ @Override
+ public void Dummy() { }
+}
diff --git a/heron/api/src/java/org/apache/heron/streamlet/impl/windowings/CustomWindowConfig.java b/heron/api/src/java/org/apache/heron/streamlet/impl/windowings/CustomWindowConfig.java
new file mode 100644
index 0000000..21a03d8
--- /dev/null
+++ b/heron/api/src/java/org/apache/heron/streamlet/impl/windowings/CustomWindowConfig.java
@@ -0,0 +1,53 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.heron.streamlet.impl.windowings;
+
+import org.apache.heron.api.bolt.BaseWindowedBolt;
+import org.apache.heron.api.tuple.Tuple;
+import org.apache.heron.api.windowing.EvictionPolicy;
+import org.apache.heron.api.windowing.TriggerPolicy;
+import org.apache.heron.streamlet.WindowConfig;
+
+/**
+ * CustomWindowConfig implements a trigger/eviction based WindowConfig.
+ */
+public final class CustomWindowConfig implements WindowConfig {
+ private TriggerPolicy<Tuple, ?> triggerPolicy;
+ private EvictionPolicy<Tuple, ?> evictionPolicy;
+
+ public CustomWindowConfig(TriggerPolicy<Tuple, ?> triggerPolicy,
+ EvictionPolicy<Tuple, ?> evictionPolicy) {
+ this.triggerPolicy = triggerPolicy;
+ this.evictionPolicy = evictionPolicy;
+ }
+
+ /**
+ * Apply this WindowConfig object to a bolt object
+ * @param bolt the target bolt object
+ */
+ @Override
+ public void applyTo(BaseWindowedBolt bolt) {
+ bolt.withCustomEvictor(evictionPolicy);
+ bolt.withCustomTrigger(triggerPolicy);
+ }
+
+ @Override
+ public void Dummy() { }
+}
diff --git a/heron/api/src/java/org/apache/heron/streamlet/impl/windowings/TimeWindowConfig.java b/heron/api/src/java/org/apache/heron/streamlet/impl/windowings/TimeWindowConfig.java
new file mode 100644
index 0000000..d6f6422
--- /dev/null
+++ b/heron/api/src/java/org/apache/heron/streamlet/impl/windowings/TimeWindowConfig.java
@@ -0,0 +1,50 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.heron.streamlet.impl.windowings;
+
+import java.time.Duration;
+
+import org.apache.heron.api.bolt.BaseWindowedBolt;
+import org.apache.heron.streamlet.WindowConfig;
+
+/**
+ * TimeWindowConfig implements a time based WindowConfig.
+ */
+public final class TimeWindowConfig implements WindowConfig {
+ private Duration windowDuration;
+ private Duration slidingIntervalDuration;
+
+ public TimeWindowConfig(Duration windowDuration, Duration slidingIntervalDuration) {
+ this.windowDuration = windowDuration;
+ this.slidingIntervalDuration = slidingIntervalDuration;
+ }
+
+ /**
+ * Apply this WindowConfig object to a bolt object
+ * @param bolt the target bolt object
+ */
+ @Override
+ public void applyTo(BaseWindowedBolt bolt) {
+ bolt.withWindow(windowDuration, slidingIntervalDuration);
+ }
+
+ @Override
+ public void Dummy() { }
+}