You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@heron.apache.org by GitBox <gi...@apache.org> on 2019/01/07 19:36:12 UTC

[GitHub] nwangtw closed pull request #3135: Refactor KVStreamlet to make it lighter and easier to convert from St…

nwangtw closed pull request #3135: Refactor KVStreamlet to make it lighter and easier to convert from St…
URL: https://github.com/apache/incubator-heron/pull/3135
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git a/heron/api/src/java/org/apache/heron/streamlet/impl/KVStreamletImpl.java b/heron/api/src/java/org/apache/heron/streamlet/impl/KVStreamletImpl.java
deleted file mode 100644
index e9590acce1..0000000000
--- a/heron/api/src/java/org/apache/heron/streamlet/impl/KVStreamletImpl.java
+++ /dev/null
@@ -1,30 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.heron.streamlet.impl;
-
-import org.apache.heron.streamlet.KVStreamlet;
-import org.apache.heron.streamlet.KeyValue;
-
-/**
- * A KVStreamlet is a Streamlet with KeyValue data.
- */
-public abstract class KVStreamletImpl<K, V>
-    extends StreamletImpl<KeyValue<K, V>>
-    implements KVStreamlet<K, V> {
-}
diff --git a/heron/api/src/java/org/apache/heron/streamlet/impl/KVStreamletShadow.java b/heron/api/src/java/org/apache/heron/streamlet/impl/KVStreamletShadow.java
new file mode 100644
index 0000000000..bed7e92110
--- /dev/null
+++ b/heron/api/src/java/org/apache/heron/streamlet/impl/KVStreamletShadow.java
@@ -0,0 +1,62 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.heron.streamlet.impl.streamlets;
+
+import org.apache.heron.streamlet.KVStreamlet;
+import org.apache.heron.streamlet.KeyValue;
+import org.apache.heron.streamlet.impl.StreamletImpl;
+
+/**
+ * KVStreamletShadow is a decorator for StreamletImpl<KeyValue<?, ?>> objects.
+ * Please check StreamShadow comments for more details.
+ *
+ * Usage:
+ * To create a shadow object that selecting "test" stream from an existing
+ * StreamletImpl<KeyValue<K, V>> object(stream):
+ *
+ * KVStreamlet<K, V> kv = new KVStreamletShadow<K, V>(stream)
+ *
+ */
+public class KVStreamletShadow<K, V>
+    extends StreamletShadow<KeyValue<K, V>>
+    implements KVStreamlet<K, V> {
+
+  // Note that KVStreamletShadow is constructed from StreamletImpl
+  public KVStreamletShadow(StreamletImpl<KeyValue<K, V>> real) {
+    super(real);
+  }
+
+  /*
+   * Functions accessible by child objects need to be overriden (forwarding the call to
+   * the real object since shadow object shouldn't have them)
+   */
+  @Override
+  public KVStreamletShadow<K, V> setName(String sName) {
+    super.setName(sName);
+    return this;
+  }
+
+  @Override
+  public KVStreamletShadow<K, V> setNumPartitions(int numPartitions) {
+    super.setNumPartitions(numPartitions);
+    return this;
+  }
+
+}
diff --git a/heron/api/src/java/org/apache/heron/streamlet/impl/StreamletImpl.java b/heron/api/src/java/org/apache/heron/streamlet/impl/StreamletImpl.java
index 3afa54996e..81a7d7827e 100644
--- a/heron/api/src/java/org/apache/heron/streamlet/impl/StreamletImpl.java
+++ b/heron/api/src/java/org/apache/heron/streamlet/impl/StreamletImpl.java
@@ -53,6 +53,7 @@
 import org.apache.heron.streamlet.impl.streamlets.GeneralReduceByKeyAndWindowStreamlet;
 import org.apache.heron.streamlet.impl.streamlets.GeneralReduceByKeyStreamlet;
 import org.apache.heron.streamlet.impl.streamlets.JoinStreamlet;
+import org.apache.heron.streamlet.impl.streamlets.KVStreamletShadow;
 import org.apache.heron.streamlet.impl.streamlets.KeyByStreamlet;
 import org.apache.heron.streamlet.impl.streamlets.LogStreamlet;
 import org.apache.heron.streamlet.impl.streamlets.MapStreamlet;
@@ -450,7 +451,7 @@ private String defaultNameCalculator(StreamletNamePrefix prefix, Set<String> sta
         this, joinee, thisKeyExtractor, otherKeyExtractor, windowCfg, joinType, joinFunction);
     addChild(retval);
     joinee.addChild(retval);
-    return retval;
+    return new KVStreamletShadow<KeyedWindow<K>, T>(retval);
   }
 
   /**
@@ -471,7 +472,7 @@ private String defaultNameCalculator(StreamletNamePrefix prefix, Set<String> sta
     ReduceByKeyStreamlet<R, K, T> retval =
         new ReduceByKeyStreamlet<>(this, keyExtractor, valueExtractor, reduceFn);
     addChild(retval);
-    return retval;
+    return new KVStreamletShadow<K, T>(retval);
   }
 
   /**
@@ -491,7 +492,7 @@ private String defaultNameCalculator(StreamletNamePrefix prefix, Set<String> sta
     GeneralReduceByKeyStreamlet<R, K, T> retval =
         new GeneralReduceByKeyStreamlet<>(this, keyExtractor, identity, reduceFn);
     addChild(retval);
-    return retval;
+    return new KVStreamletShadow<K, T>(retval);
   }
 
   /**
@@ -517,7 +518,7 @@ private String defaultNameCalculator(StreamletNamePrefix prefix, Set<String> sta
         new ReduceByKeyAndWindowStreamlet<>(this, keyExtractor, valueExtractor,
             windowCfg, reduceFn);
     addChild(retval);
-    return retval;
+    return new KVStreamletShadow<KeyedWindow<K>, T>(retval);
   }
 
   /**
@@ -546,7 +547,7 @@ private String defaultNameCalculator(StreamletNamePrefix prefix, Set<String> sta
         new GeneralReduceByKeyAndWindowStreamlet<>(this, keyExtractor, windowCfg,
             identity, reduceFn);
     addChild(retval);
-    return retval;
+    return new KVStreamletShadow<KeyedWindow<K>, T>(retval);
   }
 
   /**
@@ -691,7 +692,7 @@ public void toSink(Sink<R> sink) {
     KeyByStreamlet<R, K, V> retval =
         new KeyByStreamlet<R, K, V>(this, keyExtractor, valueExtractor);
     addChild(retval);
-    return retval;
+    return new KVStreamletShadow<K, V>(retval);
   }
 
   /**
@@ -705,7 +706,7 @@ public void toSink(Sink<R> sink) {
 
     CountByKeyStreamlet<R, K> retval = new CountByKeyStreamlet<>(this, keyExtractor);
     addChild(retval);
-    return retval;
+    return new KVStreamletShadow<K, Long>(retval);
   }
 
 
@@ -725,6 +726,6 @@ public void toSink(Sink<R> sink) {
     CountByKeyAndWindowStreamlet<R, K> retval =
         new CountByKeyAndWindowStreamlet<>(this, keyExtractor, windowCfg);
     addChild(retval);
-    return retval;
+    return new KVStreamletShadow<KeyedWindow<K>, Long>(retval);
   }
 }
diff --git a/heron/api/src/java/org/apache/heron/streamlet/impl/StreamletShadow.java b/heron/api/src/java/org/apache/heron/streamlet/impl/StreamletShadow.java
index fad23ded58..26e7eb652d 100644
--- a/heron/api/src/java/org/apache/heron/streamlet/impl/StreamletShadow.java
+++ b/heron/api/src/java/org/apache/heron/streamlet/impl/StreamletShadow.java
@@ -58,6 +58,10 @@ public StreamletShadow(StreamletImpl<R> real) {
     this.real = real;
   }
 
+  public StreamletImpl<R> getReal() {
+    return real;
+  }
+
   /**
    * Gets the stream id of this Streamlet.
    * @return the stream id of this Streamlet
@@ -71,11 +75,23 @@ public String getStreamId() {
    * Functions accessible by child objects need to be overriden (forwarding the call to
    * the real object since shadow object shouldn't have them)
    */
+  @Override
+  public StreamletShadow<R> setName(String sName) {
+    real.setName(sName);
+    return this;
+  }
+
   @Override
   public String getName() {
     return real.getName();
   }
 
+  @Override
+  public StreamletShadow<R> setNumPartitions(int numPartitions) {
+    real.setNumPartitions(numPartitions);
+    return this;
+  }
+
   @Override
   public int getNumPartitions() {
     return real.getNumPartitions();
diff --git a/heron/api/src/java/org/apache/heron/streamlet/impl/streamlets/CountByKeyAndWindowStreamlet.java b/heron/api/src/java/org/apache/heron/streamlet/impl/streamlets/CountByKeyAndWindowStreamlet.java
index 4a19e2a2b0..698c478821 100644
--- a/heron/api/src/java/org/apache/heron/streamlet/impl/streamlets/CountByKeyAndWindowStreamlet.java
+++ b/heron/api/src/java/org/apache/heron/streamlet/impl/streamlets/CountByKeyAndWindowStreamlet.java
@@ -23,11 +23,11 @@
 import java.util.Set;
 
 import org.apache.heron.api.topology.TopologyBuilder;
+import org.apache.heron.streamlet.KeyValue;
 import org.apache.heron.streamlet.KeyedWindow;
 import org.apache.heron.streamlet.SerializableFunction;
 import org.apache.heron.streamlet.StreamletReducers;
 import org.apache.heron.streamlet.WindowConfig;
-import org.apache.heron.streamlet.impl.KVStreamletImpl;
 import org.apache.heron.streamlet.impl.StreamletImpl;
 import org.apache.heron.streamlet.impl.groupings.ReduceByKeyAndWindowCustomGrouping;
 import org.apache.heron.streamlet.impl.operators.ReduceByKeyAndWindowOperator;
@@ -39,7 +39,7 @@
  * KeyWindowInfo&lt;K&gt; type and the value is Long.
  */
 public class CountByKeyAndWindowStreamlet<R, K>
-    extends KVStreamletImpl<KeyedWindow<K>, Long> {
+    extends StreamletImpl<KeyValue<KeyedWindow<K>, Long>> {
   private StreamletImpl<R> parent;
   private SerializableFunction<R, K> keyExtractor;
   private WindowConfig windowCfg;
diff --git a/heron/api/src/java/org/apache/heron/streamlet/impl/streamlets/CountByKeyStreamlet.java b/heron/api/src/java/org/apache/heron/streamlet/impl/streamlets/CountByKeyStreamlet.java
index 5c8328be4e..f339ef2d89 100644
--- a/heron/api/src/java/org/apache/heron/streamlet/impl/streamlets/CountByKeyStreamlet.java
+++ b/heron/api/src/java/org/apache/heron/streamlet/impl/streamlets/CountByKeyStreamlet.java
@@ -23,9 +23,9 @@
 import java.util.Set;
 
 import org.apache.heron.api.topology.TopologyBuilder;
+import org.apache.heron.streamlet.KeyValue;
 import org.apache.heron.streamlet.SerializableFunction;
 import org.apache.heron.streamlet.StreamletReducers;
-import org.apache.heron.streamlet.impl.KVStreamletImpl;
 import org.apache.heron.streamlet.impl.StreamletImpl;
 import org.apache.heron.streamlet.impl.groupings.ReduceByKeyAndWindowCustomGrouping;
 import org.apache.heron.streamlet.impl.operators.ReduceByKeyOperator;
@@ -38,7 +38,7 @@
  * ReduceByKeyAndWindowStreamlet's elements are of KeyValue type where the key is
  * KeyWindowInfo&lt;K&gt; type and the value is of type V.
  */
-public class CountByKeyStreamlet<R, K> extends KVStreamletImpl<K, Long> {
+public class CountByKeyStreamlet<R, K> extends StreamletImpl<KeyValue<K, Long>> {
   private StreamletImpl<R> parent;
   private SerializableFunction<R, K> keyExtractor;
 
diff --git a/heron/api/src/java/org/apache/heron/streamlet/impl/streamlets/GeneralReduceByKeyAndWindowStreamlet.java b/heron/api/src/java/org/apache/heron/streamlet/impl/streamlets/GeneralReduceByKeyAndWindowStreamlet.java
index c2b8747859..f95d7d24fe 100644
--- a/heron/api/src/java/org/apache/heron/streamlet/impl/streamlets/GeneralReduceByKeyAndWindowStreamlet.java
+++ b/heron/api/src/java/org/apache/heron/streamlet/impl/streamlets/GeneralReduceByKeyAndWindowStreamlet.java
@@ -23,11 +23,11 @@
 import java.util.Set;
 
 import org.apache.heron.api.topology.TopologyBuilder;
+import org.apache.heron.streamlet.KeyValue;
 import org.apache.heron.streamlet.KeyedWindow;
 import org.apache.heron.streamlet.SerializableBiFunction;
 import org.apache.heron.streamlet.SerializableFunction;
 import org.apache.heron.streamlet.WindowConfig;
-import org.apache.heron.streamlet.impl.KVStreamletImpl;
 import org.apache.heron.streamlet.impl.StreamletImpl;
 import org.apache.heron.streamlet.impl.groupings.ReduceByKeyAndWindowCustomGrouping;
 import org.apache.heron.streamlet.impl.operators.GeneralReduceByKeyAndWindowOperator;
@@ -40,7 +40,7 @@
  * KeyWindowInfo&lt;K&gt; type and the value is of type T.
  */
 public class GeneralReduceByKeyAndWindowStreamlet<R, K, T>
-    extends KVStreamletImpl<KeyedWindow<K>, T> {
+    extends StreamletImpl<KeyValue<KeyedWindow<K>, T>> {
   private StreamletImpl<R> parent;
   private SerializableFunction<R, K> keyExtractor;
   private WindowConfig windowCfg;
diff --git a/heron/api/src/java/org/apache/heron/streamlet/impl/streamlets/GeneralReduceByKeyStreamlet.java b/heron/api/src/java/org/apache/heron/streamlet/impl/streamlets/GeneralReduceByKeyStreamlet.java
index 9b4d2fe4fe..277a1e8c30 100644
--- a/heron/api/src/java/org/apache/heron/streamlet/impl/streamlets/GeneralReduceByKeyStreamlet.java
+++ b/heron/api/src/java/org/apache/heron/streamlet/impl/streamlets/GeneralReduceByKeyStreamlet.java
@@ -23,9 +23,9 @@
 import java.util.Set;
 
 import org.apache.heron.api.topology.TopologyBuilder;
+import org.apache.heron.streamlet.KeyValue;
 import org.apache.heron.streamlet.SerializableBiFunction;
 import org.apache.heron.streamlet.SerializableFunction;
-import org.apache.heron.streamlet.impl.KVStreamletImpl;
 import org.apache.heron.streamlet.impl.StreamletImpl;
 import org.apache.heron.streamlet.impl.groupings.ReduceByKeyAndWindowCustomGrouping;
 import org.apache.heron.streamlet.impl.operators.GeneralReduceByKeyOperator;
@@ -36,7 +36,7 @@
  * GeneralReduceByKeyStreamlet's elements are of KeyValue type where the key is
  * KeyWindowInfo&lt;K&gt; type and the value is of type T.
  */
-public class GeneralReduceByKeyStreamlet<R, K, T> extends KVStreamletImpl<K, T> {
+public class GeneralReduceByKeyStreamlet<R, K, T> extends StreamletImpl<KeyValue<K, T>> {
   private StreamletImpl<R> parent;
   private SerializableFunction<R, K> keyExtractor;
   private T identity;
diff --git a/heron/api/src/java/org/apache/heron/streamlet/impl/streamlets/JoinStreamlet.java b/heron/api/src/java/org/apache/heron/streamlet/impl/streamlets/JoinStreamlet.java
index ce03f97d39..40f76ef259 100644
--- a/heron/api/src/java/org/apache/heron/streamlet/impl/streamlets/JoinStreamlet.java
+++ b/heron/api/src/java/org/apache/heron/streamlet/impl/streamlets/JoinStreamlet.java
@@ -24,11 +24,11 @@
 
 import org.apache.heron.api.topology.TopologyBuilder;
 import org.apache.heron.streamlet.JoinType;
+import org.apache.heron.streamlet.KeyValue;
 import org.apache.heron.streamlet.KeyedWindow;
 import org.apache.heron.streamlet.SerializableBiFunction;
 import org.apache.heron.streamlet.SerializableFunction;
 import org.apache.heron.streamlet.WindowConfig;
-import org.apache.heron.streamlet.impl.KVStreamletImpl;
 import org.apache.heron.streamlet.impl.StreamletImpl;
 import org.apache.heron.streamlet.impl.groupings.JoinCustomGrouping;
 import org.apache.heron.streamlet.impl.operators.JoinOperator;
@@ -40,7 +40,7 @@
  * JoinStreamlet's elements are of KeyValue type where the key is KeyWindowInfo&lt;K&gt; type
  * and the value is of type VR.
  */
-public final class JoinStreamlet<K, R, S, T> extends KVStreamletImpl<KeyedWindow<K>, T> {
+public final class JoinStreamlet<K, R, S, T> extends StreamletImpl<KeyValue<KeyedWindow<K>, T>> {
   private JoinType joinType;
   private StreamletImpl<R> left;
   private StreamletImpl<S> right;
diff --git a/heron/api/src/java/org/apache/heron/streamlet/impl/streamlets/KeyByStreamlet.java b/heron/api/src/java/org/apache/heron/streamlet/impl/streamlets/KeyByStreamlet.java
index df76eab900..1c1eb16b00 100644
--- a/heron/api/src/java/org/apache/heron/streamlet/impl/streamlets/KeyByStreamlet.java
+++ b/heron/api/src/java/org/apache/heron/streamlet/impl/streamlets/KeyByStreamlet.java
@@ -23,8 +23,8 @@
 import java.util.Set;
 
 import org.apache.heron.api.topology.TopologyBuilder;
+import org.apache.heron.streamlet.KeyValue;
 import org.apache.heron.streamlet.SerializableFunction;
-import org.apache.heron.streamlet.impl.KVStreamletImpl;
 import org.apache.heron.streamlet.impl.StreamletImpl;
 import org.apache.heron.streamlet.impl.operators.KeyByOperator;
 
@@ -32,7 +32,7 @@
  * KeyByStreamlet represents a KVStreamlet that is the result of applying key and value extractors
  * on all elements.
  */
-public class KeyByStreamlet<R, K, V> extends KVStreamletImpl<K, V> {
+public class KeyByStreamlet<R, K, V> extends StreamletImpl<KeyValue<K, V>> {
   private StreamletImpl<R> parent;
   private SerializableFunction<R, K> keyExtractor;
   private SerializableFunction<R, V> valueExtractor;
diff --git a/heron/api/src/java/org/apache/heron/streamlet/impl/streamlets/ReduceByKeyAndWindowStreamlet.java b/heron/api/src/java/org/apache/heron/streamlet/impl/streamlets/ReduceByKeyAndWindowStreamlet.java
index 2c399f3af3..18fea4d714 100644
--- a/heron/api/src/java/org/apache/heron/streamlet/impl/streamlets/ReduceByKeyAndWindowStreamlet.java
+++ b/heron/api/src/java/org/apache/heron/streamlet/impl/streamlets/ReduceByKeyAndWindowStreamlet.java
@@ -23,11 +23,11 @@
 import java.util.Set;
 
 import org.apache.heron.api.topology.TopologyBuilder;
+import org.apache.heron.streamlet.KeyValue;
 import org.apache.heron.streamlet.KeyedWindow;
 import org.apache.heron.streamlet.SerializableBinaryOperator;
 import org.apache.heron.streamlet.SerializableFunction;
 import org.apache.heron.streamlet.WindowConfig;
-import org.apache.heron.streamlet.impl.KVStreamletImpl;
 import org.apache.heron.streamlet.impl.StreamletImpl;
 import org.apache.heron.streamlet.impl.groupings.ReduceByKeyAndWindowCustomGrouping;
 import org.apache.heron.streamlet.impl.operators.ReduceByKeyAndWindowOperator;
@@ -40,7 +40,7 @@
  * KeyWindowInfo&lt;K&gt; type and the value is of type T.
  */
 public class ReduceByKeyAndWindowStreamlet<R, K, T>
-    extends KVStreamletImpl<KeyedWindow<K>, T> {
+    extends StreamletImpl<KeyValue<KeyedWindow<K>, T>> {
   private StreamletImpl<R> parent;
   private SerializableFunction<R, K> keyExtractor;
   private SerializableFunction<R, T> valueExtractor;
diff --git a/heron/api/src/java/org/apache/heron/streamlet/impl/streamlets/ReduceByKeyStreamlet.java b/heron/api/src/java/org/apache/heron/streamlet/impl/streamlets/ReduceByKeyStreamlet.java
index ea84687a0e..9176618659 100644
--- a/heron/api/src/java/org/apache/heron/streamlet/impl/streamlets/ReduceByKeyStreamlet.java
+++ b/heron/api/src/java/org/apache/heron/streamlet/impl/streamlets/ReduceByKeyStreamlet.java
@@ -23,9 +23,9 @@
 import java.util.Set;
 
 import org.apache.heron.api.topology.TopologyBuilder;
+import org.apache.heron.streamlet.KeyValue;
 import org.apache.heron.streamlet.SerializableBinaryOperator;
 import org.apache.heron.streamlet.SerializableFunction;
-import org.apache.heron.streamlet.impl.KVStreamletImpl;
 import org.apache.heron.streamlet.impl.StreamletImpl;
 import org.apache.heron.streamlet.impl.groupings.ReduceByKeyAndWindowCustomGrouping;
 import org.apache.heron.streamlet.impl.operators.ReduceByKeyOperator;
@@ -36,7 +36,7 @@
  * ReduceByKeyAndWindowStreamlet's elements are of KeyValue type where the key is
  * KeyWindowInfo&lt;K&gt; type and the value is of type T.
  */
-public class ReduceByKeyStreamlet<R, K, T> extends KVStreamletImpl<K, T> {
+public class ReduceByKeyStreamlet<R, K, T> extends StreamletImpl<KeyValue<K, T>> {
   private StreamletImpl<R> parent;
   private SerializableFunction<R, K> keyExtractor;
   private SerializableFunction<R, T> valueExtractor;
diff --git a/heron/api/src/scala/org/apache/heron/streamlet/scala/impl/StreamletImpl.scala b/heron/api/src/scala/org/apache/heron/streamlet/scala/impl/StreamletImpl.scala
index ea68949242..fe6b7df843 100644
--- a/heron/api/src/scala/org/apache/heron/streamlet/scala/impl/StreamletImpl.scala
+++ b/heron/api/src/scala/org/apache/heron/streamlet/scala/impl/StreamletImpl.scala
@@ -35,7 +35,6 @@ import org.apache.heron.streamlet.{
   WindowConfig
 }
 import org.apache.heron.streamlet.impl.{
-  KVStreamletImpl => JavaKVStreamletImpl,
   StreamletImpl => JavaStreamletImpl
 }
 import org.apache.heron.streamlet.impl.streamlets.SupplierStreamlet
diff --git a/heron/api/tests/java/BUILD b/heron/api/tests/java/BUILD
index d8efbdf03c..e38069e7aa 100644
--- a/heron/api/tests/java/BUILD
+++ b/heron/api/tests/java/BUILD
@@ -32,6 +32,8 @@ java_tests(
     "org.apache.heron.streamlet.impl.operators.KeyByOperatorTest",
     "org.apache.heron.streamlet.impl.operators.ReduceByKeyAndWindowOperatorTest",
     "org.apache.heron.streamlet.impl.operators.GeneralReduceByKeyAndWindowOperatorTest",
+    "org.apache.heron.streamlet.impl.streamlets.KVStreamletShadowTest",
+    "org.apache.heron.streamlet.impl.streamlets.StreamletShadowTest",
     "org.apache.heron.streamlet.impl.utils.StreamletUtilsTest",
     "org.apache.heron.api.ConfigTest",
     "org.apache.heron.api.HeronSubmitterTest",
diff --git a/heron/api/tests/java/org/apache/heron/streamlet/impl/KVStreamletShadowTest.java b/heron/api/tests/java/org/apache/heron/streamlet/impl/KVStreamletShadowTest.java
new file mode 100644
index 0000000000..964959e14b
--- /dev/null
+++ b/heron/api/tests/java/org/apache/heron/streamlet/impl/KVStreamletShadowTest.java
@@ -0,0 +1,72 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.heron.streamlet.impl.streamlets;
+
+import org.junit.Test;
+import org.mockito.Mock;
+
+import org.apache.heron.streamlet.KeyValue;
+import org.apache.heron.streamlet.impl.StreamletImpl;
+
+import static org.junit.Assert.assertEquals;
+import static org.mockito.Mockito.doNothing;
+import static org.mockito.Mockito.doReturn;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.never;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
+
+/**
+ * Unit tests for {@link StreamletShadow}
+ */
+public class KVStreamletShadowTest {
+  @Mock
+  private StreamletImpl<KeyValue<String, Double>> mockReal = mock(StreamletImpl.class);
+
+  @Mock
+  private StreamletImpl<Double> mockChild = mock(StreamletImpl.class);
+
+  @Test
+  public void testConstruction() {
+    doReturn("real_name").when(mockReal).getName();
+    doReturn(1).when(mockReal).getNumPartitions();
+    doNothing().when(mockReal).addChild(mockChild);
+    doReturn(mockReal).when(mockReal).setName("shadow_name");
+    doReturn(mockReal).when(mockReal).setNumPartitions(2);
+    doReturn("real_stream").when(mockReal).getStreamId();
+
+    KVStreamletShadow<String, Double> shadow = new KVStreamletShadow(mockReal);
+    assertEquals(shadow.getName(), "real_name");
+    assertEquals(shadow.getNumPartitions(), 1);
+
+    // Set name/partition should be applied to the real object
+    KVStreamletShadow<String, Double> shadow2 = new KVStreamletShadow(mockReal)
+        .setName("shadow_name")
+        .setNumPartitions(2);
+
+    verify(mockReal, times(1)).setName("shadow_name");
+    verify(mockReal, times(1)).setNumPartitions(2);
+
+    // addChild call should be forwarded to the real object
+    verify(mockReal, never()).addChild(mockChild);
+    shadow.addChild(mockChild);
+    shadow2.addChild(mockChild);
+    verify(mockReal, times(2)).addChild(mockChild);
+  }
+}
diff --git a/heron/api/tests/java/org/apache/heron/streamlet/impl/StreamletImplTest.java b/heron/api/tests/java/org/apache/heron/streamlet/impl/StreamletImplTest.java
index fdb860c549..345e417465 100644
--- a/heron/api/tests/java/org/apache/heron/streamlet/impl/StreamletImplTest.java
+++ b/heron/api/tests/java/org/apache/heron/streamlet/impl/StreamletImplTest.java
@@ -61,6 +61,7 @@
 import org.apache.heron.streamlet.impl.streamlets.FlatMapStreamlet;
 import org.apache.heron.streamlet.impl.streamlets.GeneralReduceByKeyStreamlet;
 import org.apache.heron.streamlet.impl.streamlets.JoinStreamlet;
+import org.apache.heron.streamlet.impl.streamlets.KVStreamletShadow;
 import org.apache.heron.streamlet.impl.streamlets.KeyByStreamlet;
 import org.apache.heron.streamlet.impl.streamlets.MapStreamlet;
 import org.apache.heron.streamlet.impl.streamlets.ReduceByKeyAndWindowStreamlet;
@@ -363,15 +364,16 @@ public void testCustomStreamletFromWindowBolt() {
   @Test
   public void testKeyByStreamlet() {
     Streamlet<Double> baseStreamlet = builder.newSource(() -> Math.random());
-    KVStreamlet<Long, Double> kvStream = baseStreamlet.keyBy(x -> Math.round(x));
+    KVStreamlet<Long, Double> streamlet = baseStreamlet.keyBy(x -> Math.round(x));
 
-    assertTrue(kvStream instanceof KeyByStreamlet);
-    KeyByStreamlet<Double, Long, Double> mStreamlet =
-        (KeyByStreamlet<Double, Long, Double>) kvStream;
+    assertTrue(streamlet instanceof KVStreamletShadow);
+    KVStreamletShadow<Long, Double> mStreamlet =
+        (KVStreamletShadow<Long, Double>) streamlet;
+    assertTrue(mStreamlet.getReal() instanceof KeyByStreamlet);
     assertEquals(1, mStreamlet.getNumPartitions());
     SupplierStreamlet<Double> supplierStreamlet = (SupplierStreamlet<Double>) baseStreamlet;
     assertEquals(supplierStreamlet.getChildren().size(), 1);
-    assertEquals(supplierStreamlet.getChildren().get(0), kvStream);
+    assertEquals(supplierStreamlet.getChildren().get(0), mStreamlet.getReal());
   }
 
   @Test
@@ -383,13 +385,14 @@ public void testReduceByKeyStreamlet() {
             x -> x,
             StreamletReducers::sum);
 
-    assertTrue(streamlet instanceof ReduceByKeyStreamlet);
-    ReduceByKeyStreamlet<Double, String, Double> mStreamlet =
-        (ReduceByKeyStreamlet<Double, String, Double>) streamlet;
+    assertTrue(streamlet instanceof KVStreamletShadow);
+    KVStreamletShadow<String, Double> mStreamlet =
+        (KVStreamletShadow<String, Double>) streamlet;
+    assertTrue(mStreamlet.getReal() instanceof ReduceByKeyStreamlet);
     assertEquals(20, mStreamlet.getNumPartitions());
     SupplierStreamlet<Double> supplierStreamlet = (SupplierStreamlet<Double>) baseStreamlet;
     assertEquals(supplierStreamlet.getChildren().size(), 1);
-    assertEquals(supplierStreamlet.getChildren().get(0), streamlet);
+    assertEquals(supplierStreamlet.getChildren().get(0), mStreamlet.getReal());
   }
 
   @Test
@@ -401,13 +404,14 @@ public void testGeneralReduceByKeyStreamlet() {
             0.0,
             StreamletReducers::sum);
 
-    assertTrue(streamlet instanceof GeneralReduceByKeyStreamlet);
-    GeneralReduceByKeyStreamlet<Double, String, Double> mStreamlet =
-        (GeneralReduceByKeyStreamlet<Double, String, Double>) streamlet;
+    assertTrue(streamlet instanceof KVStreamletShadow);
+    KVStreamletShadow<String, Double> mStreamlet =
+        (KVStreamletShadow<String, Double>) streamlet;
+    assertTrue(mStreamlet.getReal() instanceof GeneralReduceByKeyStreamlet);
     assertEquals(20, mStreamlet.getNumPartitions());
     SupplierStreamlet<Double> supplierStreamlet = (SupplierStreamlet<Double>) baseStreamlet;
     assertEquals(supplierStreamlet.getChildren().size(), 1);
-    assertEquals(supplierStreamlet.getChildren().get(0), streamlet);
+    assertEquals(supplierStreamlet.getChildren().get(0), mStreamlet.getReal());
   }
 
   @Test
@@ -417,13 +421,14 @@ public void testCountByKeyStreamlet() {
     KVStreamlet<String, Long> streamlet = baseStreamlet.setNumPartitions(20)
         .countByKey(x -> (x > 0) ? "positive" : ((x < 0) ? "negative" : "zero"));
 
-    assertTrue(streamlet instanceof CountByKeyStreamlet);
-    CountByKeyStreamlet<Double, String> mStreamlet =
-        (CountByKeyStreamlet<Double, String>) streamlet;
+    assertTrue(streamlet instanceof KVStreamletShadow);
+    KVStreamletShadow<String, Long> mStreamlet =
+        (KVStreamletShadow<String, Long>) streamlet;
+    assertTrue(mStreamlet.getReal() instanceof CountByKeyStreamlet);
     assertEquals(20, mStreamlet.getNumPartitions());
     SupplierStreamlet<Double> supplierStreamlet = (SupplierStreamlet<Double>) baseStreamlet;
     assertEquals(supplierStreamlet.getChildren().size(), 1);
-    assertEquals(supplierStreamlet.getChildren().get(0), streamlet);
+    assertEquals(supplierStreamlet.getChildren().get(0), mStreamlet.getReal());
   }
 
   @Test
@@ -434,13 +439,14 @@ public void testCountByKeyAndWindowStreamlet() {
         .countByKeyAndWindow(x -> (x > 0) ? "positive" : ((x < 0) ? "negative" : "zero"),
                              WindowConfig.TumblingCountWindow(10));
 
-    assertTrue(streamlet instanceof CountByKeyAndWindowStreamlet);
-    CountByKeyAndWindowStreamlet<Double, String> mStreamlet =
-        (CountByKeyAndWindowStreamlet<Double, String>) streamlet;
+    assertTrue(streamlet instanceof KVStreamletShadow);
+    KVStreamletShadow<KeyedWindow<String>, Long> mStreamlet =
+        (KVStreamletShadow<KeyedWindow<String>, Long>) streamlet;
+    assertTrue(mStreamlet.getReal() instanceof CountByKeyAndWindowStreamlet);
     assertEquals(20, mStreamlet.getNumPartitions());
     SupplierStreamlet<Double> supplierStreamlet = (SupplierStreamlet<Double>) baseStreamlet;
     assertEquals(supplierStreamlet.getChildren().size(), 1);
-    assertEquals(supplierStreamlet.getChildren().get(0), streamlet);
+    assertEquals(supplierStreamlet.getChildren().get(0), mStreamlet.getReal());
   }
 
   @Test
diff --git a/heron/api/tests/java/org/apache/heron/streamlet/impl/StreamletShadowTest.java b/heron/api/tests/java/org/apache/heron/streamlet/impl/StreamletShadowTest.java
index fbbd803c1a..fe7ecbe072 100644
--- a/heron/api/tests/java/org/apache/heron/streamlet/impl/StreamletShadowTest.java
+++ b/heron/api/tests/java/org/apache/heron/streamlet/impl/StreamletShadowTest.java
@@ -18,8 +18,6 @@
  */
 package org.apache.heron.streamlet.impl.streamlets;
 
-import java.util.ArrayList;
-
 import org.junit.Test;
 import org.mockito.Mock;
 
@@ -28,6 +26,7 @@
 import static org.junit.Assert.assertEquals;
 import static org.mockito.Mockito.doNothing;
 import static org.mockito.Mockito.doReturn;
+import static org.mockito.Mockito.mock;
 import static org.mockito.Mockito.never;
 import static org.mockito.Mockito.times;
 import static org.mockito.Mockito.verify;
@@ -37,17 +36,18 @@
  */
 public class StreamletShadowTest {
   @Mock
-  private StreamletImpl<Double> mockReal;
+  private StreamletImpl<Double> mockReal = mock(StreamletImpl.class);
 
   @Mock
-  private StreamletImpl<Double> mockChild;
+  private StreamletImpl<Double> mockChild = mock(StreamletImpl.class);
 
   @Test
   public void testConstruction() {
     doReturn("real_name").when(mockReal).getName();
     doReturn(1).when(mockReal).getNumPartitions();
     doNothing().when(mockReal).addChild(mockChild);
-    doReturn(new ArrayList<StreamletImpl<Double>>()).when(mockReal).getChildren();
+    doReturn(mockReal).when(mockReal).setName("shadow_name");
+    doReturn(mockReal).when(mockReal).setNumPartitions(2);
     doReturn("real_stream").when(mockReal).getStreamId();
 
     StreamletShadow<Double> shadow = new StreamletShadow(mockReal);
@@ -62,9 +62,14 @@ public String getStreamId() {
         return "shadow_stream";
       }
     };
-    assertEquals(shadow.getName(), "real_name");
-    assertEquals(shadow.getNumPartitions(), 1);
-    assertEquals(shadow.getStreamId(), "shadow_stream");
+    assertEquals(shadow2.getName(), "real_name");
+    assertEquals(shadow2.getNumPartitions(), 1);
+    assertEquals(shadow2.getStreamId(), "shadow_stream");
+
+    // Set name/partition should be applied to the real object
+    shadow.setName("shadow_name").setNumPartitions(2);
+    verify(mockReal, times(1)).setName("shadow_name");
+    verify(mockReal, times(1)).setNumPartitions(2);
 
     // addChild call should be forwarded to the real object
     verify(mockReal, never()).addChild(mockChild);


 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services