You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@heron.apache.org by GitBox <gi...@apache.org> on 2018/12/18 22:28:38 UTC

[GitHub] nwangtw closed pull request #3123: Refactor window operations to avoid WindowConfigImpl cast

nwangtw closed pull request #3123: Refactor window operations to avoid WindowConfigImpl cast
URL: https://github.com/apache/incubator-heron/pull/3123
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git a/heron/api/src/java/org/apache/heron/streamlet/WindowConfig.java b/heron/api/src/java/org/apache/heron/streamlet/WindowConfig.java
index 1e32dc35d1..800321cdaa 100644
--- a/heron/api/src/java/org/apache/heron/streamlet/WindowConfig.java
+++ b/heron/api/src/java/org/apache/heron/streamlet/WindowConfig.java
@@ -23,10 +23,13 @@
 
 import java.time.Duration;
 
+import org.apache.heron.api.bolt.BaseWindowedBolt;
 import org.apache.heron.api.tuple.Tuple;
 import org.apache.heron.api.windowing.EvictionPolicy;
 import org.apache.heron.api.windowing.TriggerPolicy;
-import org.apache.heron.streamlet.impl.WindowConfigImpl;
+import org.apache.heron.streamlet.impl.windowings.CountWindowConfig;
+import org.apache.heron.streamlet.impl.windowings.CustomWindowConfig;
+import org.apache.heron.streamlet.impl.windowings.TimeWindowConfig;
 
 /**
  * WindowConfig allows Streamlet API users to program window configuration for operations
@@ -34,13 +37,27 @@
  * sliding/tumbling windows.
  */
 public interface WindowConfig {
+
+  /**
+   * Apply this WindowConfig object to a bolt object
+   * @param bolt the target bolt object
+   */
+  void applyTo(BaseWindowedBolt bolt);
+
+  /**
+   * This is just a dummy function to avoid WindowConfig objects to be matched with Java functional interface
+   * and cause ambiguous reference compiling error. In case new virtual functions are needed in WindowConfig,
+   * this dummy function can be safely removed.
+   */
+  void Dummy();
+
   /**
    * Creates a time based tumbling window of windowDuration
    * @param windowDuration the duration of the tumbling window
    * @return WindowConfig that can be passed to the transformation
    */
   static WindowConfig TumblingTimeWindow(Duration windowDuration) {
-    return new WindowConfigImpl(windowDuration, windowDuration);
+    return new TimeWindowConfig(windowDuration, windowDuration);
   }
 
   /**
@@ -51,7 +68,7 @@ static WindowConfig TumblingTimeWindow(Duration windowDuration) {
    * @return WindowConfig that can be passed to the transformation
    */
   static WindowConfig SlidingTimeWindow(Duration windowDuration, Duration slideInterval) {
-    return new WindowConfigImpl(windowDuration, slideInterval);
+    return new TimeWindowConfig(windowDuration, slideInterval);
   }
 
   /**
@@ -60,7 +77,7 @@ static WindowConfig SlidingTimeWindow(Duration windowDuration, Duration slideInt
    * @return WindowConfig that can be passed to the transformation
    */
   static WindowConfig TumblingCountWindow(int windowSize) {
-    return new WindowConfigImpl(windowSize, windowSize);
+    return new CountWindowConfig(windowSize, windowSize);
   }
 
   /**
@@ -71,7 +88,7 @@ static WindowConfig TumblingCountWindow(int windowSize) {
    * @return WindowConfig that can be passed to the transformation
    */
   static WindowConfig SlidingCountWindow(int windowSize, int slideSize) {
-    return new WindowConfigImpl(windowSize, slideSize);
+    return new CountWindowConfig(windowSize, slideSize);
   }
 
   /**
@@ -82,6 +99,6 @@ static WindowConfig SlidingCountWindow(int windowSize, int slideSize) {
    */
   static WindowConfig CustomWindow(TriggerPolicy<Tuple, ?> triggerPolicy,
                                    EvictionPolicy<Tuple, ?> evictionPolicy) {
-    return new WindowConfigImpl(triggerPolicy, evictionPolicy);
+    return new CustomWindowConfig(triggerPolicy, evictionPolicy);
   }
 }
diff --git a/heron/api/src/java/org/apache/heron/streamlet/impl/WindowConfigImpl.java b/heron/api/src/java/org/apache/heron/streamlet/impl/WindowConfigImpl.java
deleted file mode 100644
index 09c29f5282..0000000000
--- a/heron/api/src/java/org/apache/heron/streamlet/impl/WindowConfigImpl.java
+++ /dev/null
@@ -1,77 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.heron.streamlet.impl;
-
-import java.time.Duration;
-
-import org.apache.heron.api.bolt.BaseWindowedBolt;
-import org.apache.heron.api.tuple.Tuple;
-import org.apache.heron.api.windowing.EvictionPolicy;
-import org.apache.heron.api.windowing.TriggerPolicy;
-import org.apache.heron.streamlet.WindowConfig;
-
-/**
- * WindowConfigImpl implements the WindowConfig interface.
- */
-public final class WindowConfigImpl implements WindowConfig {
-  private enum WindowType { TIME, COUNT, CUSTOM }
-  private WindowType windowType;
-  private int windowSize;
-  private int slideInterval;
-  private Duration windowDuration;
-  private Duration slidingIntervalDuration;
-  private TriggerPolicy<Tuple, ?> triggerPolicy;
-  private EvictionPolicy<Tuple, ?> evictionPolicy;
-
-  public WindowConfigImpl(Duration windowDuration, Duration slidingIntervalDuration) {
-    this.windowType = WindowType.TIME;
-    this.windowDuration = windowDuration;
-    this.slidingIntervalDuration = slidingIntervalDuration;
-  }
-  public WindowConfigImpl(int windowSize, int slideInterval) {
-    this.windowType = WindowType.COUNT;
-    this.windowSize = windowSize;
-    this.slideInterval = slideInterval;
-  }
-  public WindowConfigImpl(TriggerPolicy<Tuple, ?> triggerPolicy,
-                          EvictionPolicy<Tuple, ?> evictionPolicy) {
-    this.windowType = WindowType.CUSTOM;
-    this.triggerPolicy = triggerPolicy;
-    this.evictionPolicy = evictionPolicy;
-  }
-
-  public void attachWindowConfig(BaseWindowedBolt bolt) {
-    switch(windowType) {
-      case COUNT:
-        bolt.withWindow(BaseWindowedBolt.Count.of(windowSize),
-                        BaseWindowedBolt.Count.of(slideInterval));
-        break;
-      case TIME:
-        bolt.withWindow(windowDuration, slidingIntervalDuration);
-        break;
-      case CUSTOM:
-        bolt.withCustomEvictor(evictionPolicy);
-        bolt.withCustomTrigger(triggerPolicy);
-        break;
-      default:
-        throw new RuntimeException("Unknown windowType " + String.valueOf(windowType));
-    }
-  }
-}
diff --git a/heron/api/src/java/org/apache/heron/streamlet/impl/operators/GeneralReduceByKeyAndWindowOperator.java b/heron/api/src/java/org/apache/heron/streamlet/impl/operators/GeneralReduceByKeyAndWindowOperator.java
index fe6d4586e8..fbdc4db44f 100644
--- a/heron/api/src/java/org/apache/heron/streamlet/impl/operators/GeneralReduceByKeyAndWindowOperator.java
+++ b/heron/api/src/java/org/apache/heron/streamlet/impl/operators/GeneralReduceByKeyAndWindowOperator.java
@@ -43,8 +43,9 @@
   private VR identity;
   private SerializableBiFunction<VR, V, ? extends VR> reduceFn;
 
-  public GeneralReduceByKeyAndWindowOperator(SerializableFunction<V, K> keyExtractor, VR identity,
-                            SerializableBiFunction<VR, V, ? extends VR> reduceFn) {
+  public GeneralReduceByKeyAndWindowOperator(SerializableFunction<V, K> keyExtractor,
+                                             VR identity,
+                                             SerializableBiFunction<VR, V, ? extends VR> reduceFn) {
     this.keyExtractor = keyExtractor;
     this.identity = identity;
     this.reduceFn = reduceFn;
diff --git a/heron/api/src/java/org/apache/heron/streamlet/impl/streamlets/GeneralReduceByKeyAndWindowStreamlet.java b/heron/api/src/java/org/apache/heron/streamlet/impl/streamlets/GeneralReduceByKeyAndWindowStreamlet.java
index 3f0abd89c2..b66956f41d 100644
--- a/heron/api/src/java/org/apache/heron/streamlet/impl/streamlets/GeneralReduceByKeyAndWindowStreamlet.java
+++ b/heron/api/src/java/org/apache/heron/streamlet/impl/streamlets/GeneralReduceByKeyAndWindowStreamlet.java
@@ -29,7 +29,6 @@
 import org.apache.heron.streamlet.SerializableFunction;
 import org.apache.heron.streamlet.WindowConfig;
 import org.apache.heron.streamlet.impl.StreamletImpl;
-import org.apache.heron.streamlet.impl.WindowConfigImpl;
 import org.apache.heron.streamlet.impl.groupings.ReduceByKeyAndWindowCustomGrouping;
 import org.apache.heron.streamlet.impl.operators.GeneralReduceByKeyAndWindowOperator;
 
@@ -44,7 +43,7 @@
     extends StreamletImpl<KeyValue<KeyedWindow<K>, VR>> {
   private StreamletImpl<V> parent;
   private SerializableFunction<V, K> keyExtractor;
-  private WindowConfigImpl windowCfg;
+  private WindowConfig windowCfg;
   private VR identity;
   private SerializableBiFunction<VR, V, ? extends VR> reduceFn;
 
@@ -55,7 +54,7 @@ public GeneralReduceByKeyAndWindowStreamlet(StreamletImpl<V> parent,
                             SerializableBiFunction<VR, V, ? extends VR> reduceFn) {
     this.parent = parent;
     this.keyExtractor = keyExtractor;
-    this.windowCfg = (WindowConfigImpl) windowCfg;
+    this.windowCfg = windowCfg;
     this.identity = identity;
     this.reduceFn = reduceFn;
     setNumPartitions(parent.getNumPartitions());
@@ -66,7 +65,7 @@ public boolean doBuild(TopologyBuilder bldr, Set<String> stageNames) {
     setDefaultNameIfNone(StreamletNamePrefix.REDUCE, stageNames);
     GeneralReduceByKeyAndWindowOperator<K, V, VR> bolt =
         new GeneralReduceByKeyAndWindowOperator<K, V, VR>(keyExtractor, identity, reduceFn);
-    windowCfg.attachWindowConfig(bolt);
+    windowCfg.applyTo(bolt);
     bldr.setBolt(getName(), bolt, getNumPartitions())
         .customGrouping(parent.getName(), parent.getStreamId(),
             new ReduceByKeyAndWindowCustomGrouping<K, V>(keyExtractor));
diff --git a/heron/api/src/java/org/apache/heron/streamlet/impl/streamlets/JoinStreamlet.java b/heron/api/src/java/org/apache/heron/streamlet/impl/streamlets/JoinStreamlet.java
index 89c6de9187..3f3effb300 100644
--- a/heron/api/src/java/org/apache/heron/streamlet/impl/streamlets/JoinStreamlet.java
+++ b/heron/api/src/java/org/apache/heron/streamlet/impl/streamlets/JoinStreamlet.java
@@ -30,7 +30,6 @@
 import org.apache.heron.streamlet.SerializableFunction;
 import org.apache.heron.streamlet.WindowConfig;
 import org.apache.heron.streamlet.impl.StreamletImpl;
-import org.apache.heron.streamlet.impl.WindowConfigImpl;
 import org.apache.heron.streamlet.impl.groupings.JoinCustomGrouping;
 import org.apache.heron.streamlet.impl.operators.JoinOperator;
 
@@ -47,7 +46,7 @@
   private StreamletImpl<S> right;
   private SerializableFunction<R, K> leftKeyExtractor;
   private SerializableFunction<S, K> rightKeyExtractor;
-  private WindowConfigImpl windowCfg;
+  private WindowConfig windowCfg;
   private SerializableBiFunction<R, S, ? extends T> joinFn;
 
   public static <A, B, C, D> JoinStreamlet<A, B, C, D>
@@ -73,7 +72,7 @@ private JoinStreamlet(JoinType joinType, StreamletImpl<R> left,
     this.right = right;
     this.leftKeyExtractor = leftKeyExtractor;
     this.rightKeyExtractor = rightKeyExtractor;
-    this.windowCfg = (WindowConfigImpl) windowCfg;
+    this.windowCfg = windowCfg;
     this.joinFn = joinFn;
     setNumPartitions(left.getNumPartitions());
   }
@@ -90,7 +89,7 @@ public boolean doBuild(TopologyBuilder bldr, Set<String> stageNames) {
     setDefaultNameIfNone(StreamletNamePrefix.JOIN, stageNames);
     JoinOperator<K, R, S, T> bolt = new JoinOperator<>(joinType, left.getName(),
         right.getName(), leftKeyExtractor, rightKeyExtractor, joinFn);
-    windowCfg.attachWindowConfig(bolt);
+    windowCfg.applyTo(bolt);
     bldr.setBolt(getName(), bolt, getNumPartitions())
         .customGrouping(left.getName(), left.getStreamId(),
             new JoinCustomGrouping<K, R>(leftKeyExtractor))
diff --git a/heron/api/src/java/org/apache/heron/streamlet/impl/streamlets/ReduceByKeyAndWindowStreamlet.java b/heron/api/src/java/org/apache/heron/streamlet/impl/streamlets/ReduceByKeyAndWindowStreamlet.java
index 7e7a7e1f88..24052a8ef0 100644
--- a/heron/api/src/java/org/apache/heron/streamlet/impl/streamlets/ReduceByKeyAndWindowStreamlet.java
+++ b/heron/api/src/java/org/apache/heron/streamlet/impl/streamlets/ReduceByKeyAndWindowStreamlet.java
@@ -29,7 +29,6 @@
 import org.apache.heron.streamlet.SerializableFunction;
 import org.apache.heron.streamlet.WindowConfig;
 import org.apache.heron.streamlet.impl.StreamletImpl;
-import org.apache.heron.streamlet.impl.WindowConfigImpl;
 import org.apache.heron.streamlet.impl.groupings.ReduceByKeyAndWindowCustomGrouping;
 import org.apache.heron.streamlet.impl.operators.ReduceByKeyAndWindowOperator;
 
@@ -45,7 +44,7 @@
   private StreamletImpl<R> parent;
   private SerializableFunction<R, K> keyExtractor;
   private SerializableFunction<R, V> valueExtractor;
-  private WindowConfigImpl windowCfg;
+  private WindowConfig windowCfg;
   private SerializableBinaryOperator<V> reduceFn;
 
   public ReduceByKeyAndWindowStreamlet(StreamletImpl<R> parent,
@@ -56,7 +55,7 @@ public ReduceByKeyAndWindowStreamlet(StreamletImpl<R> parent,
     this.parent = parent;
     this.keyExtractor = keyExtractor;
     this.valueExtractor = valueExtractor;
-    this.windowCfg = (WindowConfigImpl) windowCfg;
+    this.windowCfg = windowCfg;
     this.reduceFn = reduceFn;
     setNumPartitions(parent.getNumPartitions());
   }
@@ -66,7 +65,7 @@ public boolean doBuild(TopologyBuilder bldr, Set<String> stageNames) {
     setDefaultNameIfNone(StreamletNamePrefix.REDUCE, stageNames);
     ReduceByKeyAndWindowOperator<K, V, R> bolt = new ReduceByKeyAndWindowOperator<>(keyExtractor,
         valueExtractor, reduceFn);
-    windowCfg.attachWindowConfig(bolt);
+    windowCfg.applyTo(bolt);
     bldr.setBolt(getName(), bolt, getNumPartitions())
         .customGrouping(parent.getName(), parent.getStreamId(),
             new ReduceByKeyAndWindowCustomGrouping<K, R>(keyExtractor));
diff --git a/heron/api/src/java/org/apache/heron/streamlet/impl/windowings/CountWindowConfig.java b/heron/api/src/java/org/apache/heron/streamlet/impl/windowings/CountWindowConfig.java
new file mode 100644
index 0000000000..ebe67d9786
--- /dev/null
+++ b/heron/api/src/java/org/apache/heron/streamlet/impl/windowings/CountWindowConfig.java
@@ -0,0 +1,49 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.heron.streamlet.impl.windowings;
+
+import org.apache.heron.api.bolt.BaseWindowedBolt;
+import org.apache.heron.streamlet.WindowConfig;
+
+/**
+ * CountWindowConfig implements a count based WindowConfig.
+ */
+public final class CountWindowConfig implements WindowConfig {
+  private int windowSize;
+  private int slideInterval;
+
+  public CountWindowConfig(int windowSize, int slideInterval) {
+    this.windowSize = windowSize;
+    this.slideInterval = slideInterval;
+  }
+
+  /**
+   * Apply this WindowConfig object to a bolt object
+   * @param bolt the target bolt object
+   */
+  @Override
+  public void applyTo(BaseWindowedBolt bolt) {
+    bolt.withWindow(BaseWindowedBolt.Count.of(windowSize),
+                    BaseWindowedBolt.Count.of(slideInterval));
+  }
+
+  @Override
+  public void Dummy() { }
+}
diff --git a/heron/api/src/java/org/apache/heron/streamlet/impl/windowings/CustomWindowConfig.java b/heron/api/src/java/org/apache/heron/streamlet/impl/windowings/CustomWindowConfig.java
new file mode 100644
index 0000000000..21a03d89ca
--- /dev/null
+++ b/heron/api/src/java/org/apache/heron/streamlet/impl/windowings/CustomWindowConfig.java
@@ -0,0 +1,53 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.heron.streamlet.impl.windowings;
+
+import org.apache.heron.api.bolt.BaseWindowedBolt;
+import org.apache.heron.api.tuple.Tuple;
+import org.apache.heron.api.windowing.EvictionPolicy;
+import org.apache.heron.api.windowing.TriggerPolicy;
+import org.apache.heron.streamlet.WindowConfig;
+
+/**
+ * CustomWindowConfig implements a trigger/eviction based WindowConfig.
+ */
+public final class CustomWindowConfig implements WindowConfig {
+  private TriggerPolicy<Tuple, ?> triggerPolicy;
+  private EvictionPolicy<Tuple, ?> evictionPolicy;
+
+  public CustomWindowConfig(TriggerPolicy<Tuple, ?> triggerPolicy,
+                          EvictionPolicy<Tuple, ?> evictionPolicy) {
+    this.triggerPolicy = triggerPolicy;
+    this.evictionPolicy = evictionPolicy;
+  }
+
+  /**
+   * Apply this WindowConfig object to a bolt object
+   * @param bolt the target bolt object
+   */
+  @Override
+  public void applyTo(BaseWindowedBolt bolt) {
+    bolt.withCustomEvictor(evictionPolicy);
+    bolt.withCustomTrigger(triggerPolicy);
+  }
+
+  @Override
+  public void Dummy() { }
+}
diff --git a/heron/api/src/java/org/apache/heron/streamlet/impl/windowings/TimeWindowConfig.java b/heron/api/src/java/org/apache/heron/streamlet/impl/windowings/TimeWindowConfig.java
new file mode 100644
index 0000000000..d6f642297a
--- /dev/null
+++ b/heron/api/src/java/org/apache/heron/streamlet/impl/windowings/TimeWindowConfig.java
@@ -0,0 +1,50 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.heron.streamlet.impl.windowings;
+
+import java.time.Duration;
+
+import org.apache.heron.api.bolt.BaseWindowedBolt;
+import org.apache.heron.streamlet.WindowConfig;
+
+/**
+ * TimeWindowConfig implements a time based WindowConfig.
+ */
+public final class TimeWindowConfig implements WindowConfig {
+  private Duration windowDuration;
+  private Duration slidingIntervalDuration;
+
+  public TimeWindowConfig(Duration windowDuration, Duration slidingIntervalDuration) {
+    this.windowDuration = windowDuration;
+    this.slidingIntervalDuration = slidingIntervalDuration;
+  }
+
+  /**
+   * Apply this WindowConfig object to a bolt object
+   * @param bolt the target bolt object
+   */
+  @Override
+  public void applyTo(BaseWindowedBolt bolt) {
+    bolt.withWindow(windowDuration, slidingIntervalDuration);
+  }
+
+  @Override
+  public void Dummy() { }
+}


 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services