You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@heron.apache.org by nw...@apache.org on 2019/01/07 19:36:16 UTC

[incubator-heron] branch master updated: Refactor KVStreamlet to make it lighter and easier to convert from Streamlet (#3135)

This is an automated email from the ASF dual-hosted git repository.

nwang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-heron.git


The following commit(s) were added to refs/heads/master by this push:
     new 0ec33b9  Refactor KVStreamlet to make it lighter and easier to convert from Streamlet (#3135)
0ec33b9 is described below

commit 0ec33b9685caa50a18aa2f7441503595e2b9cfa5
Author: Ning Wang <nw...@twitter.com>
AuthorDate: Mon Jan 7 11:36:10 2019 -0800

    Refactor KVStreamlet to make it lighter and easier to convert from Streamlet (#3135)
---
 .../heron/streamlet/impl/KVStreamletImpl.java      | 30 -----------
 .../heron/streamlet/impl/KVStreamletShadow.java    | 62 ++++++++++++++++++++++
 .../apache/heron/streamlet/impl/StreamletImpl.java | 17 +++---
 .../heron/streamlet/impl/StreamletShadow.java      | 16 ++++++
 .../streamlets/CountByKeyAndWindowStreamlet.java   |  4 +-
 .../impl/streamlets/CountByKeyStreamlet.java       |  4 +-
 .../GeneralReduceByKeyAndWindowStreamlet.java      |  4 +-
 .../streamlets/GeneralReduceByKeyStreamlet.java    |  4 +-
 .../streamlet/impl/streamlets/JoinStreamlet.java   |  4 +-
 .../streamlet/impl/streamlets/KeyByStreamlet.java  |  4 +-
 .../streamlets/ReduceByKeyAndWindowStreamlet.java  |  4 +-
 .../impl/streamlets/ReduceByKeyStreamlet.java      |  4 +-
 .../heron/streamlet/scala/impl/StreamletImpl.scala |  1 -
 heron/api/tests/java/BUILD                         |  2 +
 ...tShadowTest.java => KVStreamletShadowTest.java} | 33 ++++++------
 .../heron/streamlet/impl/StreamletImplTest.java    | 48 +++++++++--------
 .../heron/streamlet/impl/StreamletShadowTest.java  | 21 +++++---
 17 files changed, 160 insertions(+), 102 deletions(-)

diff --git a/heron/api/src/java/org/apache/heron/streamlet/impl/KVStreamletImpl.java b/heron/api/src/java/org/apache/heron/streamlet/impl/KVStreamletImpl.java
deleted file mode 100644
index e9590ac..0000000
--- a/heron/api/src/java/org/apache/heron/streamlet/impl/KVStreamletImpl.java
+++ /dev/null
@@ -1,30 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.heron.streamlet.impl;
-
-import org.apache.heron.streamlet.KVStreamlet;
-import org.apache.heron.streamlet.KeyValue;
-
-/**
- * A KVStreamlet is a Streamlet with KeyValue data.
- */
-public abstract class KVStreamletImpl<K, V>
-    extends StreamletImpl<KeyValue<K, V>>
-    implements KVStreamlet<K, V> {
-}
diff --git a/heron/api/src/java/org/apache/heron/streamlet/impl/KVStreamletShadow.java b/heron/api/src/java/org/apache/heron/streamlet/impl/KVStreamletShadow.java
new file mode 100644
index 0000000..bed7e92
--- /dev/null
+++ b/heron/api/src/java/org/apache/heron/streamlet/impl/KVStreamletShadow.java
@@ -0,0 +1,62 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.heron.streamlet.impl.streamlets;
+
+import org.apache.heron.streamlet.KVStreamlet;
+import org.apache.heron.streamlet.KeyValue;
+import org.apache.heron.streamlet.impl.StreamletImpl;
+
+/**
+ * KVStreamletShadow is a decorator for StreamletImpl<KeyValue<?, ?>> objects.
+ * Please check StreamShadow comments for more details.
+ *
+ * Usage:
+ * To create a shadow object that selecting "test" stream from an existing
+ * StreamletImpl<KeyValue<K, V>> object(stream):
+ *
+ * KVStreamlet<K, V> kv = new KVStreamletShadow<K, V>(stream)
+ *
+ */
+public class KVStreamletShadow<K, V>
+    extends StreamletShadow<KeyValue<K, V>>
+    implements KVStreamlet<K, V> {
+
+  // Note that KVStreamletShadow is constructed from StreamletImpl
+  public KVStreamletShadow(StreamletImpl<KeyValue<K, V>> real) {
+    super(real);
+  }
+
+  /*
+   * Functions accessible by child objects need to be overriden (forwarding the call to
+   * the real object since shadow object shouldn't have them)
+   */
+  @Override
+  public KVStreamletShadow<K, V> setName(String sName) {
+    super.setName(sName);
+    return this;
+  }
+
+  @Override
+  public KVStreamletShadow<K, V> setNumPartitions(int numPartitions) {
+    super.setNumPartitions(numPartitions);
+    return this;
+  }
+
+}
diff --git a/heron/api/src/java/org/apache/heron/streamlet/impl/StreamletImpl.java b/heron/api/src/java/org/apache/heron/streamlet/impl/StreamletImpl.java
index 3afa549..81a7d78 100644
--- a/heron/api/src/java/org/apache/heron/streamlet/impl/StreamletImpl.java
+++ b/heron/api/src/java/org/apache/heron/streamlet/impl/StreamletImpl.java
@@ -53,6 +53,7 @@ import org.apache.heron.streamlet.impl.streamlets.FlatMapStreamlet;
 import org.apache.heron.streamlet.impl.streamlets.GeneralReduceByKeyAndWindowStreamlet;
 import org.apache.heron.streamlet.impl.streamlets.GeneralReduceByKeyStreamlet;
 import org.apache.heron.streamlet.impl.streamlets.JoinStreamlet;
+import org.apache.heron.streamlet.impl.streamlets.KVStreamletShadow;
 import org.apache.heron.streamlet.impl.streamlets.KeyByStreamlet;
 import org.apache.heron.streamlet.impl.streamlets.LogStreamlet;
 import org.apache.heron.streamlet.impl.streamlets.MapStreamlet;
@@ -450,7 +451,7 @@ public abstract class StreamletImpl<R> implements Streamlet<R> {
         this, joinee, thisKeyExtractor, otherKeyExtractor, windowCfg, joinType, joinFunction);
     addChild(retval);
     joinee.addChild(retval);
-    return retval;
+    return new KVStreamletShadow<KeyedWindow<K>, T>(retval);
   }
 
   /**
@@ -471,7 +472,7 @@ public abstract class StreamletImpl<R> implements Streamlet<R> {
     ReduceByKeyStreamlet<R, K, T> retval =
         new ReduceByKeyStreamlet<>(this, keyExtractor, valueExtractor, reduceFn);
     addChild(retval);
-    return retval;
+    return new KVStreamletShadow<K, T>(retval);
   }
 
   /**
@@ -491,7 +492,7 @@ public abstract class StreamletImpl<R> implements Streamlet<R> {
     GeneralReduceByKeyStreamlet<R, K, T> retval =
         new GeneralReduceByKeyStreamlet<>(this, keyExtractor, identity, reduceFn);
     addChild(retval);
-    return retval;
+    return new KVStreamletShadow<K, T>(retval);
   }
 
   /**
@@ -517,7 +518,7 @@ public abstract class StreamletImpl<R> implements Streamlet<R> {
         new ReduceByKeyAndWindowStreamlet<>(this, keyExtractor, valueExtractor,
             windowCfg, reduceFn);
     addChild(retval);
-    return retval;
+    return new KVStreamletShadow<KeyedWindow<K>, T>(retval);
   }
 
   /**
@@ -546,7 +547,7 @@ public abstract class StreamletImpl<R> implements Streamlet<R> {
         new GeneralReduceByKeyAndWindowStreamlet<>(this, keyExtractor, windowCfg,
             identity, reduceFn);
     addChild(retval);
-    return retval;
+    return new KVStreamletShadow<KeyedWindow<K>, T>(retval);
   }
 
   /**
@@ -691,7 +692,7 @@ public abstract class StreamletImpl<R> implements Streamlet<R> {
     KeyByStreamlet<R, K, V> retval =
         new KeyByStreamlet<R, K, V>(this, keyExtractor, valueExtractor);
     addChild(retval);
-    return retval;
+    return new KVStreamletShadow<K, V>(retval);
   }
 
   /**
@@ -705,7 +706,7 @@ public abstract class StreamletImpl<R> implements Streamlet<R> {
 
     CountByKeyStreamlet<R, K> retval = new CountByKeyStreamlet<>(this, keyExtractor);
     addChild(retval);
-    return retval;
+    return new KVStreamletShadow<K, Long>(retval);
   }
 
 
@@ -725,6 +726,6 @@ public abstract class StreamletImpl<R> implements Streamlet<R> {
     CountByKeyAndWindowStreamlet<R, K> retval =
         new CountByKeyAndWindowStreamlet<>(this, keyExtractor, windowCfg);
     addChild(retval);
-    return retval;
+    return new KVStreamletShadow<KeyedWindow<K>, Long>(retval);
   }
 }
diff --git a/heron/api/src/java/org/apache/heron/streamlet/impl/StreamletShadow.java b/heron/api/src/java/org/apache/heron/streamlet/impl/StreamletShadow.java
index fad23de..26e7eb6 100644
--- a/heron/api/src/java/org/apache/heron/streamlet/impl/StreamletShadow.java
+++ b/heron/api/src/java/org/apache/heron/streamlet/impl/StreamletShadow.java
@@ -58,6 +58,10 @@ public class StreamletShadow<R> extends StreamletImpl<R> {
     this.real = real;
   }
 
+  public StreamletImpl<R> getReal() {
+    return real;
+  }
+
   /**
    * Gets the stream id of this Streamlet.
    * @return the stream id of this Streamlet
@@ -72,11 +76,23 @@ public class StreamletShadow<R> extends StreamletImpl<R> {
    * the real object since shadow object shouldn't have them)
    */
   @Override
+  public StreamletShadow<R> setName(String sName) {
+    real.setName(sName);
+    return this;
+  }
+
+  @Override
   public String getName() {
     return real.getName();
   }
 
   @Override
+  public StreamletShadow<R> setNumPartitions(int numPartitions) {
+    real.setNumPartitions(numPartitions);
+    return this;
+  }
+
+  @Override
   public int getNumPartitions() {
     return real.getNumPartitions();
   }
diff --git a/heron/api/src/java/org/apache/heron/streamlet/impl/streamlets/CountByKeyAndWindowStreamlet.java b/heron/api/src/java/org/apache/heron/streamlet/impl/streamlets/CountByKeyAndWindowStreamlet.java
index 4a19e2a..698c478 100644
--- a/heron/api/src/java/org/apache/heron/streamlet/impl/streamlets/CountByKeyAndWindowStreamlet.java
+++ b/heron/api/src/java/org/apache/heron/streamlet/impl/streamlets/CountByKeyAndWindowStreamlet.java
@@ -23,11 +23,11 @@ package org.apache.heron.streamlet.impl.streamlets;
 import java.util.Set;
 
 import org.apache.heron.api.topology.TopologyBuilder;
+import org.apache.heron.streamlet.KeyValue;
 import org.apache.heron.streamlet.KeyedWindow;
 import org.apache.heron.streamlet.SerializableFunction;
 import org.apache.heron.streamlet.StreamletReducers;
 import org.apache.heron.streamlet.WindowConfig;
-import org.apache.heron.streamlet.impl.KVStreamletImpl;
 import org.apache.heron.streamlet.impl.StreamletImpl;
 import org.apache.heron.streamlet.impl.groupings.ReduceByKeyAndWindowCustomGrouping;
 import org.apache.heron.streamlet.impl.operators.ReduceByKeyAndWindowOperator;
@@ -39,7 +39,7 @@ import org.apache.heron.streamlet.impl.operators.ReduceByKeyAndWindowOperator;
  * KeyWindowInfo&lt;K&gt; type and the value is Long.
  */
 public class CountByKeyAndWindowStreamlet<R, K>
-    extends KVStreamletImpl<KeyedWindow<K>, Long> {
+    extends StreamletImpl<KeyValue<KeyedWindow<K>, Long>> {
   private StreamletImpl<R> parent;
   private SerializableFunction<R, K> keyExtractor;
   private WindowConfig windowCfg;
diff --git a/heron/api/src/java/org/apache/heron/streamlet/impl/streamlets/CountByKeyStreamlet.java b/heron/api/src/java/org/apache/heron/streamlet/impl/streamlets/CountByKeyStreamlet.java
index 5c8328b..f339ef2 100644
--- a/heron/api/src/java/org/apache/heron/streamlet/impl/streamlets/CountByKeyStreamlet.java
+++ b/heron/api/src/java/org/apache/heron/streamlet/impl/streamlets/CountByKeyStreamlet.java
@@ -23,9 +23,9 @@ package org.apache.heron.streamlet.impl.streamlets;
 import java.util.Set;
 
 import org.apache.heron.api.topology.TopologyBuilder;
+import org.apache.heron.streamlet.KeyValue;
 import org.apache.heron.streamlet.SerializableFunction;
 import org.apache.heron.streamlet.StreamletReducers;
-import org.apache.heron.streamlet.impl.KVStreamletImpl;
 import org.apache.heron.streamlet.impl.StreamletImpl;
 import org.apache.heron.streamlet.impl.groupings.ReduceByKeyAndWindowCustomGrouping;
 import org.apache.heron.streamlet.impl.operators.ReduceByKeyOperator;
@@ -38,7 +38,7 @@ import org.apache.heron.streamlet.impl.operators.ReduceByKeyOperator;
  * ReduceByKeyAndWindowStreamlet's elements are of KeyValue type where the key is
  * KeyWindowInfo&lt;K&gt; type and the value is of type V.
  */
-public class CountByKeyStreamlet<R, K> extends KVStreamletImpl<K, Long> {
+public class CountByKeyStreamlet<R, K> extends StreamletImpl<KeyValue<K, Long>> {
   private StreamletImpl<R> parent;
   private SerializableFunction<R, K> keyExtractor;
 
diff --git a/heron/api/src/java/org/apache/heron/streamlet/impl/streamlets/GeneralReduceByKeyAndWindowStreamlet.java b/heron/api/src/java/org/apache/heron/streamlet/impl/streamlets/GeneralReduceByKeyAndWindowStreamlet.java
index c2b8747..f95d7d2 100644
--- a/heron/api/src/java/org/apache/heron/streamlet/impl/streamlets/GeneralReduceByKeyAndWindowStreamlet.java
+++ b/heron/api/src/java/org/apache/heron/streamlet/impl/streamlets/GeneralReduceByKeyAndWindowStreamlet.java
@@ -23,11 +23,11 @@ package org.apache.heron.streamlet.impl.streamlets;
 import java.util.Set;
 
 import org.apache.heron.api.topology.TopologyBuilder;
+import org.apache.heron.streamlet.KeyValue;
 import org.apache.heron.streamlet.KeyedWindow;
 import org.apache.heron.streamlet.SerializableBiFunction;
 import org.apache.heron.streamlet.SerializableFunction;
 import org.apache.heron.streamlet.WindowConfig;
-import org.apache.heron.streamlet.impl.KVStreamletImpl;
 import org.apache.heron.streamlet.impl.StreamletImpl;
 import org.apache.heron.streamlet.impl.groupings.ReduceByKeyAndWindowCustomGrouping;
 import org.apache.heron.streamlet.impl.operators.GeneralReduceByKeyAndWindowOperator;
@@ -40,7 +40,7 @@ import org.apache.heron.streamlet.impl.operators.GeneralReduceByKeyAndWindowOper
  * KeyWindowInfo&lt;K&gt; type and the value is of type T.
  */
 public class GeneralReduceByKeyAndWindowStreamlet<R, K, T>
-    extends KVStreamletImpl<KeyedWindow<K>, T> {
+    extends StreamletImpl<KeyValue<KeyedWindow<K>, T>> {
   private StreamletImpl<R> parent;
   private SerializableFunction<R, K> keyExtractor;
   private WindowConfig windowCfg;
diff --git a/heron/api/src/java/org/apache/heron/streamlet/impl/streamlets/GeneralReduceByKeyStreamlet.java b/heron/api/src/java/org/apache/heron/streamlet/impl/streamlets/GeneralReduceByKeyStreamlet.java
index 9b4d2fe..277a1e8 100644
--- a/heron/api/src/java/org/apache/heron/streamlet/impl/streamlets/GeneralReduceByKeyStreamlet.java
+++ b/heron/api/src/java/org/apache/heron/streamlet/impl/streamlets/GeneralReduceByKeyStreamlet.java
@@ -23,9 +23,9 @@ package org.apache.heron.streamlet.impl.streamlets;
 import java.util.Set;
 
 import org.apache.heron.api.topology.TopologyBuilder;
+import org.apache.heron.streamlet.KeyValue;
 import org.apache.heron.streamlet.SerializableBiFunction;
 import org.apache.heron.streamlet.SerializableFunction;
-import org.apache.heron.streamlet.impl.KVStreamletImpl;
 import org.apache.heron.streamlet.impl.StreamletImpl;
 import org.apache.heron.streamlet.impl.groupings.ReduceByKeyAndWindowCustomGrouping;
 import org.apache.heron.streamlet.impl.operators.GeneralReduceByKeyOperator;
@@ -36,7 +36,7 @@ import org.apache.heron.streamlet.impl.operators.GeneralReduceByKeyOperator;
  * GeneralReduceByKeyStreamlet's elements are of KeyValue type where the key is
  * KeyWindowInfo&lt;K&gt; type and the value is of type T.
  */
-public class GeneralReduceByKeyStreamlet<R, K, T> extends KVStreamletImpl<K, T> {
+public class GeneralReduceByKeyStreamlet<R, K, T> extends StreamletImpl<KeyValue<K, T>> {
   private StreamletImpl<R> parent;
   private SerializableFunction<R, K> keyExtractor;
   private T identity;
diff --git a/heron/api/src/java/org/apache/heron/streamlet/impl/streamlets/JoinStreamlet.java b/heron/api/src/java/org/apache/heron/streamlet/impl/streamlets/JoinStreamlet.java
index ce03f97..40f76ef 100644
--- a/heron/api/src/java/org/apache/heron/streamlet/impl/streamlets/JoinStreamlet.java
+++ b/heron/api/src/java/org/apache/heron/streamlet/impl/streamlets/JoinStreamlet.java
@@ -24,11 +24,11 @@ import java.util.Set;
 
 import org.apache.heron.api.topology.TopologyBuilder;
 import org.apache.heron.streamlet.JoinType;
+import org.apache.heron.streamlet.KeyValue;
 import org.apache.heron.streamlet.KeyedWindow;
 import org.apache.heron.streamlet.SerializableBiFunction;
 import org.apache.heron.streamlet.SerializableFunction;
 import org.apache.heron.streamlet.WindowConfig;
-import org.apache.heron.streamlet.impl.KVStreamletImpl;
 import org.apache.heron.streamlet.impl.StreamletImpl;
 import org.apache.heron.streamlet.impl.groupings.JoinCustomGrouping;
 import org.apache.heron.streamlet.impl.operators.JoinOperator;
@@ -40,7 +40,7 @@ import org.apache.heron.streamlet.impl.operators.JoinOperator;
  * JoinStreamlet's elements are of KeyValue type where the key is KeyWindowInfo&lt;K&gt; type
  * and the value is of type VR.
  */
-public final class JoinStreamlet<K, R, S, T> extends KVStreamletImpl<KeyedWindow<K>, T> {
+public final class JoinStreamlet<K, R, S, T> extends StreamletImpl<KeyValue<KeyedWindow<K>, T>> {
   private JoinType joinType;
   private StreamletImpl<R> left;
   private StreamletImpl<S> right;
diff --git a/heron/api/src/java/org/apache/heron/streamlet/impl/streamlets/KeyByStreamlet.java b/heron/api/src/java/org/apache/heron/streamlet/impl/streamlets/KeyByStreamlet.java
index df76eab..1c1eb16 100644
--- a/heron/api/src/java/org/apache/heron/streamlet/impl/streamlets/KeyByStreamlet.java
+++ b/heron/api/src/java/org/apache/heron/streamlet/impl/streamlets/KeyByStreamlet.java
@@ -23,8 +23,8 @@ package org.apache.heron.streamlet.impl.streamlets;
 import java.util.Set;
 
 import org.apache.heron.api.topology.TopologyBuilder;
+import org.apache.heron.streamlet.KeyValue;
 import org.apache.heron.streamlet.SerializableFunction;
-import org.apache.heron.streamlet.impl.KVStreamletImpl;
 import org.apache.heron.streamlet.impl.StreamletImpl;
 import org.apache.heron.streamlet.impl.operators.KeyByOperator;
 
@@ -32,7 +32,7 @@ import org.apache.heron.streamlet.impl.operators.KeyByOperator;
  * KeyByStreamlet represents a KVStreamlet that is the result of applying key and value extractors
  * on all elements.
  */
-public class KeyByStreamlet<R, K, V> extends KVStreamletImpl<K, V> {
+public class KeyByStreamlet<R, K, V> extends StreamletImpl<KeyValue<K, V>> {
   private StreamletImpl<R> parent;
   private SerializableFunction<R, K> keyExtractor;
   private SerializableFunction<R, V> valueExtractor;
diff --git a/heron/api/src/java/org/apache/heron/streamlet/impl/streamlets/ReduceByKeyAndWindowStreamlet.java b/heron/api/src/java/org/apache/heron/streamlet/impl/streamlets/ReduceByKeyAndWindowStreamlet.java
index 2c399f3..18fea4d 100644
--- a/heron/api/src/java/org/apache/heron/streamlet/impl/streamlets/ReduceByKeyAndWindowStreamlet.java
+++ b/heron/api/src/java/org/apache/heron/streamlet/impl/streamlets/ReduceByKeyAndWindowStreamlet.java
@@ -23,11 +23,11 @@ package org.apache.heron.streamlet.impl.streamlets;
 import java.util.Set;
 
 import org.apache.heron.api.topology.TopologyBuilder;
+import org.apache.heron.streamlet.KeyValue;
 import org.apache.heron.streamlet.KeyedWindow;
 import org.apache.heron.streamlet.SerializableBinaryOperator;
 import org.apache.heron.streamlet.SerializableFunction;
 import org.apache.heron.streamlet.WindowConfig;
-import org.apache.heron.streamlet.impl.KVStreamletImpl;
 import org.apache.heron.streamlet.impl.StreamletImpl;
 import org.apache.heron.streamlet.impl.groupings.ReduceByKeyAndWindowCustomGrouping;
 import org.apache.heron.streamlet.impl.operators.ReduceByKeyAndWindowOperator;
@@ -40,7 +40,7 @@ import org.apache.heron.streamlet.impl.operators.ReduceByKeyAndWindowOperator;
  * KeyWindowInfo&lt;K&gt; type and the value is of type T.
  */
 public class ReduceByKeyAndWindowStreamlet<R, K, T>
-    extends KVStreamletImpl<KeyedWindow<K>, T> {
+    extends StreamletImpl<KeyValue<KeyedWindow<K>, T>> {
   private StreamletImpl<R> parent;
   private SerializableFunction<R, K> keyExtractor;
   private SerializableFunction<R, T> valueExtractor;
diff --git a/heron/api/src/java/org/apache/heron/streamlet/impl/streamlets/ReduceByKeyStreamlet.java b/heron/api/src/java/org/apache/heron/streamlet/impl/streamlets/ReduceByKeyStreamlet.java
index ea84687..9176618 100644
--- a/heron/api/src/java/org/apache/heron/streamlet/impl/streamlets/ReduceByKeyStreamlet.java
+++ b/heron/api/src/java/org/apache/heron/streamlet/impl/streamlets/ReduceByKeyStreamlet.java
@@ -23,9 +23,9 @@ package org.apache.heron.streamlet.impl.streamlets;
 import java.util.Set;
 
 import org.apache.heron.api.topology.TopologyBuilder;
+import org.apache.heron.streamlet.KeyValue;
 import org.apache.heron.streamlet.SerializableBinaryOperator;
 import org.apache.heron.streamlet.SerializableFunction;
-import org.apache.heron.streamlet.impl.KVStreamletImpl;
 import org.apache.heron.streamlet.impl.StreamletImpl;
 import org.apache.heron.streamlet.impl.groupings.ReduceByKeyAndWindowCustomGrouping;
 import org.apache.heron.streamlet.impl.operators.ReduceByKeyOperator;
@@ -36,7 +36,7 @@ import org.apache.heron.streamlet.impl.operators.ReduceByKeyOperator;
  * ReduceByKeyAndWindowStreamlet's elements are of KeyValue type where the key is
  * KeyWindowInfo&lt;K&gt; type and the value is of type T.
  */
-public class ReduceByKeyStreamlet<R, K, T> extends KVStreamletImpl<K, T> {
+public class ReduceByKeyStreamlet<R, K, T> extends StreamletImpl<KeyValue<K, T>> {
   private StreamletImpl<R> parent;
   private SerializableFunction<R, K> keyExtractor;
   private SerializableFunction<R, T> valueExtractor;
diff --git a/heron/api/src/scala/org/apache/heron/streamlet/scala/impl/StreamletImpl.scala b/heron/api/src/scala/org/apache/heron/streamlet/scala/impl/StreamletImpl.scala
index ea68949..fe6b7df 100644
--- a/heron/api/src/scala/org/apache/heron/streamlet/scala/impl/StreamletImpl.scala
+++ b/heron/api/src/scala/org/apache/heron/streamlet/scala/impl/StreamletImpl.scala
@@ -35,7 +35,6 @@ import org.apache.heron.streamlet.{
   WindowConfig
 }
 import org.apache.heron.streamlet.impl.{
-  KVStreamletImpl => JavaKVStreamletImpl,
   StreamletImpl => JavaStreamletImpl
 }
 import org.apache.heron.streamlet.impl.streamlets.SupplierStreamlet
diff --git a/heron/api/tests/java/BUILD b/heron/api/tests/java/BUILD
index d8efbdf..e38069e 100644
--- a/heron/api/tests/java/BUILD
+++ b/heron/api/tests/java/BUILD
@@ -32,6 +32,8 @@ java_tests(
     "org.apache.heron.streamlet.impl.operators.KeyByOperatorTest",
     "org.apache.heron.streamlet.impl.operators.ReduceByKeyAndWindowOperatorTest",
     "org.apache.heron.streamlet.impl.operators.GeneralReduceByKeyAndWindowOperatorTest",
+    "org.apache.heron.streamlet.impl.streamlets.KVStreamletShadowTest",
+    "org.apache.heron.streamlet.impl.streamlets.StreamletShadowTest",
     "org.apache.heron.streamlet.impl.utils.StreamletUtilsTest",
     "org.apache.heron.api.ConfigTest",
     "org.apache.heron.api.HeronSubmitterTest",
diff --git a/heron/api/tests/java/org/apache/heron/streamlet/impl/StreamletShadowTest.java b/heron/api/tests/java/org/apache/heron/streamlet/impl/KVStreamletShadowTest.java
similarity index 70%
copy from heron/api/tests/java/org/apache/heron/streamlet/impl/StreamletShadowTest.java
copy to heron/api/tests/java/org/apache/heron/streamlet/impl/KVStreamletShadowTest.java
index fbbd803..964959e 100644
--- a/heron/api/tests/java/org/apache/heron/streamlet/impl/StreamletShadowTest.java
+++ b/heron/api/tests/java/org/apache/heron/streamlet/impl/KVStreamletShadowTest.java
@@ -18,16 +18,16 @@
  */
 package org.apache.heron.streamlet.impl.streamlets;
 
-import java.util.ArrayList;
-
 import org.junit.Test;
 import org.mockito.Mock;
 
+import org.apache.heron.streamlet.KeyValue;
 import org.apache.heron.streamlet.impl.StreamletImpl;
 
 import static org.junit.Assert.assertEquals;
 import static org.mockito.Mockito.doNothing;
 import static org.mockito.Mockito.doReturn;
+import static org.mockito.Mockito.mock;
 import static org.mockito.Mockito.never;
 import static org.mockito.Mockito.times;
 import static org.mockito.Mockito.verify;
@@ -35,36 +35,33 @@ import static org.mockito.Mockito.verify;
 /**
  * Unit tests for {@link StreamletShadow}
  */
-public class StreamletShadowTest {
+public class KVStreamletShadowTest {
   @Mock
-  private StreamletImpl<Double> mockReal;
+  private StreamletImpl<KeyValue<String, Double>> mockReal = mock(StreamletImpl.class);
 
   @Mock
-  private StreamletImpl<Double> mockChild;
+  private StreamletImpl<Double> mockChild = mock(StreamletImpl.class);
 
   @Test
   public void testConstruction() {
     doReturn("real_name").when(mockReal).getName();
     doReturn(1).when(mockReal).getNumPartitions();
     doNothing().when(mockReal).addChild(mockChild);
-    doReturn(new ArrayList<StreamletImpl<Double>>()).when(mockReal).getChildren();
+    doReturn(mockReal).when(mockReal).setName("shadow_name");
+    doReturn(mockReal).when(mockReal).setNumPartitions(2);
     doReturn("real_stream").when(mockReal).getStreamId();
 
-    StreamletShadow<Double> shadow = new StreamletShadow(mockReal);
+    KVStreamletShadow<String, Double> shadow = new KVStreamletShadow(mockReal);
     assertEquals(shadow.getName(), "real_name");
     assertEquals(shadow.getNumPartitions(), 1);
-    assertEquals(shadow.getStreamId(), "real_stream");
 
-    // set a different stream id
-    StreamletShadow<Double> shadow2 = new StreamletShadow(mockReal) {
-      @Override
-      public String getStreamId() {
-        return "shadow_stream";
-      }
-    };
-    assertEquals(shadow.getName(), "real_name");
-    assertEquals(shadow.getNumPartitions(), 1);
-    assertEquals(shadow.getStreamId(), "shadow_stream");
+    // Set name/partition should be applied to the real object
+    KVStreamletShadow<String, Double> shadow2 = new KVStreamletShadow(mockReal)
+        .setName("shadow_name")
+        .setNumPartitions(2);
+
+    verify(mockReal, times(1)).setName("shadow_name");
+    verify(mockReal, times(1)).setNumPartitions(2);
 
     // addChild call should be forwarded to the real object
     verify(mockReal, never()).addChild(mockChild);
diff --git a/heron/api/tests/java/org/apache/heron/streamlet/impl/StreamletImplTest.java b/heron/api/tests/java/org/apache/heron/streamlet/impl/StreamletImplTest.java
index fdb860c..345e417 100644
--- a/heron/api/tests/java/org/apache/heron/streamlet/impl/StreamletImplTest.java
+++ b/heron/api/tests/java/org/apache/heron/streamlet/impl/StreamletImplTest.java
@@ -61,6 +61,7 @@ import org.apache.heron.streamlet.impl.streamlets.FilterStreamlet;
 import org.apache.heron.streamlet.impl.streamlets.FlatMapStreamlet;
 import org.apache.heron.streamlet.impl.streamlets.GeneralReduceByKeyStreamlet;
 import org.apache.heron.streamlet.impl.streamlets.JoinStreamlet;
+import org.apache.heron.streamlet.impl.streamlets.KVStreamletShadow;
 import org.apache.heron.streamlet.impl.streamlets.KeyByStreamlet;
 import org.apache.heron.streamlet.impl.streamlets.MapStreamlet;
 import org.apache.heron.streamlet.impl.streamlets.ReduceByKeyAndWindowStreamlet;
@@ -363,15 +364,16 @@ public class StreamletImplTest {
   @Test
   public void testKeyByStreamlet() {
     Streamlet<Double> baseStreamlet = builder.newSource(() -> Math.random());
-    KVStreamlet<Long, Double> kvStream = baseStreamlet.keyBy(x -> Math.round(x));
+    KVStreamlet<Long, Double> streamlet = baseStreamlet.keyBy(x -> Math.round(x));
 
-    assertTrue(kvStream instanceof KeyByStreamlet);
-    KeyByStreamlet<Double, Long, Double> mStreamlet =
-        (KeyByStreamlet<Double, Long, Double>) kvStream;
+    assertTrue(streamlet instanceof KVStreamletShadow);
+    KVStreamletShadow<Long, Double> mStreamlet =
+        (KVStreamletShadow<Long, Double>) streamlet;
+    assertTrue(mStreamlet.getReal() instanceof KeyByStreamlet);
     assertEquals(1, mStreamlet.getNumPartitions());
     SupplierStreamlet<Double> supplierStreamlet = (SupplierStreamlet<Double>) baseStreamlet;
     assertEquals(supplierStreamlet.getChildren().size(), 1);
-    assertEquals(supplierStreamlet.getChildren().get(0), kvStream);
+    assertEquals(supplierStreamlet.getChildren().get(0), mStreamlet.getReal());
   }
 
   @Test
@@ -383,13 +385,14 @@ public class StreamletImplTest {
             x -> x,
             StreamletReducers::sum);
 
-    assertTrue(streamlet instanceof ReduceByKeyStreamlet);
-    ReduceByKeyStreamlet<Double, String, Double> mStreamlet =
-        (ReduceByKeyStreamlet<Double, String, Double>) streamlet;
+    assertTrue(streamlet instanceof KVStreamletShadow);
+    KVStreamletShadow<String, Double> mStreamlet =
+        (KVStreamletShadow<String, Double>) streamlet;
+    assertTrue(mStreamlet.getReal() instanceof ReduceByKeyStreamlet);
     assertEquals(20, mStreamlet.getNumPartitions());
     SupplierStreamlet<Double> supplierStreamlet = (SupplierStreamlet<Double>) baseStreamlet;
     assertEquals(supplierStreamlet.getChildren().size(), 1);
-    assertEquals(supplierStreamlet.getChildren().get(0), streamlet);
+    assertEquals(supplierStreamlet.getChildren().get(0), mStreamlet.getReal());
   }
 
   @Test
@@ -401,13 +404,14 @@ public class StreamletImplTest {
             0.0,
             StreamletReducers::sum);
 
-    assertTrue(streamlet instanceof GeneralReduceByKeyStreamlet);
-    GeneralReduceByKeyStreamlet<Double, String, Double> mStreamlet =
-        (GeneralReduceByKeyStreamlet<Double, String, Double>) streamlet;
+    assertTrue(streamlet instanceof KVStreamletShadow);
+    KVStreamletShadow<String, Double> mStreamlet =
+        (KVStreamletShadow<String, Double>) streamlet;
+    assertTrue(mStreamlet.getReal() instanceof GeneralReduceByKeyStreamlet);
     assertEquals(20, mStreamlet.getNumPartitions());
     SupplierStreamlet<Double> supplierStreamlet = (SupplierStreamlet<Double>) baseStreamlet;
     assertEquals(supplierStreamlet.getChildren().size(), 1);
-    assertEquals(supplierStreamlet.getChildren().get(0), streamlet);
+    assertEquals(supplierStreamlet.getChildren().get(0), mStreamlet.getReal());
   }
 
   @Test
@@ -417,13 +421,14 @@ public class StreamletImplTest {
     KVStreamlet<String, Long> streamlet = baseStreamlet.setNumPartitions(20)
         .countByKey(x -> (x > 0) ? "positive" : ((x < 0) ? "negative" : "zero"));
 
-    assertTrue(streamlet instanceof CountByKeyStreamlet);
-    CountByKeyStreamlet<Double, String> mStreamlet =
-        (CountByKeyStreamlet<Double, String>) streamlet;
+    assertTrue(streamlet instanceof KVStreamletShadow);
+    KVStreamletShadow<String, Long> mStreamlet =
+        (KVStreamletShadow<String, Long>) streamlet;
+    assertTrue(mStreamlet.getReal() instanceof CountByKeyStreamlet);
     assertEquals(20, mStreamlet.getNumPartitions());
     SupplierStreamlet<Double> supplierStreamlet = (SupplierStreamlet<Double>) baseStreamlet;
     assertEquals(supplierStreamlet.getChildren().size(), 1);
-    assertEquals(supplierStreamlet.getChildren().get(0), streamlet);
+    assertEquals(supplierStreamlet.getChildren().get(0), mStreamlet.getReal());
   }
 
   @Test
@@ -434,13 +439,14 @@ public class StreamletImplTest {
         .countByKeyAndWindow(x -> (x > 0) ? "positive" : ((x < 0) ? "negative" : "zero"),
                              WindowConfig.TumblingCountWindow(10));
 
-    assertTrue(streamlet instanceof CountByKeyAndWindowStreamlet);
-    CountByKeyAndWindowStreamlet<Double, String> mStreamlet =
-        (CountByKeyAndWindowStreamlet<Double, String>) streamlet;
+    assertTrue(streamlet instanceof KVStreamletShadow);
+    KVStreamletShadow<KeyedWindow<String>, Long> mStreamlet =
+        (KVStreamletShadow<KeyedWindow<String>, Long>) streamlet;
+    assertTrue(mStreamlet.getReal() instanceof CountByKeyAndWindowStreamlet);
     assertEquals(20, mStreamlet.getNumPartitions());
     SupplierStreamlet<Double> supplierStreamlet = (SupplierStreamlet<Double>) baseStreamlet;
     assertEquals(supplierStreamlet.getChildren().size(), 1);
-    assertEquals(supplierStreamlet.getChildren().get(0), streamlet);
+    assertEquals(supplierStreamlet.getChildren().get(0), mStreamlet.getReal());
   }
 
   @Test
diff --git a/heron/api/tests/java/org/apache/heron/streamlet/impl/StreamletShadowTest.java b/heron/api/tests/java/org/apache/heron/streamlet/impl/StreamletShadowTest.java
index fbbd803..fe7ecbe 100644
--- a/heron/api/tests/java/org/apache/heron/streamlet/impl/StreamletShadowTest.java
+++ b/heron/api/tests/java/org/apache/heron/streamlet/impl/StreamletShadowTest.java
@@ -18,8 +18,6 @@
  */
 package org.apache.heron.streamlet.impl.streamlets;
 
-import java.util.ArrayList;
-
 import org.junit.Test;
 import org.mockito.Mock;
 
@@ -28,6 +26,7 @@ import org.apache.heron.streamlet.impl.StreamletImpl;
 import static org.junit.Assert.assertEquals;
 import static org.mockito.Mockito.doNothing;
 import static org.mockito.Mockito.doReturn;
+import static org.mockito.Mockito.mock;
 import static org.mockito.Mockito.never;
 import static org.mockito.Mockito.times;
 import static org.mockito.Mockito.verify;
@@ -37,17 +36,18 @@ import static org.mockito.Mockito.verify;
  */
 public class StreamletShadowTest {
   @Mock
-  private StreamletImpl<Double> mockReal;
+  private StreamletImpl<Double> mockReal = mock(StreamletImpl.class);
 
   @Mock
-  private StreamletImpl<Double> mockChild;
+  private StreamletImpl<Double> mockChild = mock(StreamletImpl.class);
 
   @Test
   public void testConstruction() {
     doReturn("real_name").when(mockReal).getName();
     doReturn(1).when(mockReal).getNumPartitions();
     doNothing().when(mockReal).addChild(mockChild);
-    doReturn(new ArrayList<StreamletImpl<Double>>()).when(mockReal).getChildren();
+    doReturn(mockReal).when(mockReal).setName("shadow_name");
+    doReturn(mockReal).when(mockReal).setNumPartitions(2);
     doReturn("real_stream").when(mockReal).getStreamId();
 
     StreamletShadow<Double> shadow = new StreamletShadow(mockReal);
@@ -62,9 +62,14 @@ public class StreamletShadowTest {
         return "shadow_stream";
       }
     };
-    assertEquals(shadow.getName(), "real_name");
-    assertEquals(shadow.getNumPartitions(), 1);
-    assertEquals(shadow.getStreamId(), "shadow_stream");
+    assertEquals(shadow2.getName(), "real_name");
+    assertEquals(shadow2.getNumPartitions(), 1);
+    assertEquals(shadow2.getStreamId(), "shadow_stream");
+
+    // Set name/partition should be applied to the real object
+    shadow.setName("shadow_name").setNumPartitions(2);
+    verify(mockReal, times(1)).setName("shadow_name");
+    verify(mockReal, times(1)).setNumPartitions(2);
 
     // addChild call should be forwarded to the real object
     verify(mockReal, never()).addChild(mockChild);