You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@heron.apache.org by nw...@apache.org on 2019/01/07 19:36:16 UTC
[incubator-heron] branch master updated: Refactor KVStreamlet to
make it lighter and easier to convert from Streamlet (#3135)
This is an automated email from the ASF dual-hosted git repository.
nwang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-heron.git
The following commit(s) were added to refs/heads/master by this push:
new 0ec33b9 Refactor KVStreamlet to make it lighter and easier to convert from Streamlet (#3135)
0ec33b9 is described below
commit 0ec33b9685caa50a18aa2f7441503595e2b9cfa5
Author: Ning Wang <nw...@twitter.com>
AuthorDate: Mon Jan 7 11:36:10 2019 -0800
Refactor KVStreamlet to make it lighter and easier to convert from Streamlet (#3135)
---
.../heron/streamlet/impl/KVStreamletImpl.java | 30 -----------
.../heron/streamlet/impl/KVStreamletShadow.java | 62 ++++++++++++++++++++++
.../apache/heron/streamlet/impl/StreamletImpl.java | 17 +++---
.../heron/streamlet/impl/StreamletShadow.java | 16 ++++++
.../streamlets/CountByKeyAndWindowStreamlet.java | 4 +-
.../impl/streamlets/CountByKeyStreamlet.java | 4 +-
.../GeneralReduceByKeyAndWindowStreamlet.java | 4 +-
.../streamlets/GeneralReduceByKeyStreamlet.java | 4 +-
.../streamlet/impl/streamlets/JoinStreamlet.java | 4 +-
.../streamlet/impl/streamlets/KeyByStreamlet.java | 4 +-
.../streamlets/ReduceByKeyAndWindowStreamlet.java | 4 +-
.../impl/streamlets/ReduceByKeyStreamlet.java | 4 +-
.../heron/streamlet/scala/impl/StreamletImpl.scala | 1 -
heron/api/tests/java/BUILD | 2 +
...tShadowTest.java => KVStreamletShadowTest.java} | 33 ++++++------
.../heron/streamlet/impl/StreamletImplTest.java | 48 +++++++++--------
.../heron/streamlet/impl/StreamletShadowTest.java | 21 +++++---
17 files changed, 160 insertions(+), 102 deletions(-)
diff --git a/heron/api/src/java/org/apache/heron/streamlet/impl/KVStreamletImpl.java b/heron/api/src/java/org/apache/heron/streamlet/impl/KVStreamletImpl.java
deleted file mode 100644
index e9590ac..0000000
--- a/heron/api/src/java/org/apache/heron/streamlet/impl/KVStreamletImpl.java
+++ /dev/null
@@ -1,30 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.heron.streamlet.impl;
-
-import org.apache.heron.streamlet.KVStreamlet;
-import org.apache.heron.streamlet.KeyValue;
-
-/**
- * A KVStreamlet is a Streamlet with KeyValue data.
- */
-public abstract class KVStreamletImpl<K, V>
- extends StreamletImpl<KeyValue<K, V>>
- implements KVStreamlet<K, V> {
-}
diff --git a/heron/api/src/java/org/apache/heron/streamlet/impl/KVStreamletShadow.java b/heron/api/src/java/org/apache/heron/streamlet/impl/KVStreamletShadow.java
new file mode 100644
index 0000000..bed7e92
--- /dev/null
+++ b/heron/api/src/java/org/apache/heron/streamlet/impl/KVStreamletShadow.java
@@ -0,0 +1,62 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.heron.streamlet.impl.streamlets;
+
+import org.apache.heron.streamlet.KVStreamlet;
+import org.apache.heron.streamlet.KeyValue;
+import org.apache.heron.streamlet.impl.StreamletImpl;
+
+/**
+ * KVStreamletShadow is a decorator for StreamletImpl<KeyValue<?, ?>> objects.
+ * Please check StreamShadow comments for more details.
+ *
+ * Usage:
+ * To create a shadow object that selecting "test" stream from an existing
+ * StreamletImpl<KeyValue<K, V>> object(stream):
+ *
+ * KVStreamlet<K, V> kv = new KVStreamletShadow<K, V>(stream)
+ *
+ */
+public class KVStreamletShadow<K, V>
+ extends StreamletShadow<KeyValue<K, V>>
+ implements KVStreamlet<K, V> {
+
+ // Note that KVStreamletShadow is constructed from StreamletImpl
+ public KVStreamletShadow(StreamletImpl<KeyValue<K, V>> real) {
+ super(real);
+ }
+
+ /*
+ * Functions accessible by child objects need to be overriden (forwarding the call to
+ * the real object since shadow object shouldn't have them)
+ */
+ @Override
+ public KVStreamletShadow<K, V> setName(String sName) {
+ super.setName(sName);
+ return this;
+ }
+
+ @Override
+ public KVStreamletShadow<K, V> setNumPartitions(int numPartitions) {
+ super.setNumPartitions(numPartitions);
+ return this;
+ }
+
+}
diff --git a/heron/api/src/java/org/apache/heron/streamlet/impl/StreamletImpl.java b/heron/api/src/java/org/apache/heron/streamlet/impl/StreamletImpl.java
index 3afa549..81a7d78 100644
--- a/heron/api/src/java/org/apache/heron/streamlet/impl/StreamletImpl.java
+++ b/heron/api/src/java/org/apache/heron/streamlet/impl/StreamletImpl.java
@@ -53,6 +53,7 @@ import org.apache.heron.streamlet.impl.streamlets.FlatMapStreamlet;
import org.apache.heron.streamlet.impl.streamlets.GeneralReduceByKeyAndWindowStreamlet;
import org.apache.heron.streamlet.impl.streamlets.GeneralReduceByKeyStreamlet;
import org.apache.heron.streamlet.impl.streamlets.JoinStreamlet;
+import org.apache.heron.streamlet.impl.streamlets.KVStreamletShadow;
import org.apache.heron.streamlet.impl.streamlets.KeyByStreamlet;
import org.apache.heron.streamlet.impl.streamlets.LogStreamlet;
import org.apache.heron.streamlet.impl.streamlets.MapStreamlet;
@@ -450,7 +451,7 @@ public abstract class StreamletImpl<R> implements Streamlet<R> {
this, joinee, thisKeyExtractor, otherKeyExtractor, windowCfg, joinType, joinFunction);
addChild(retval);
joinee.addChild(retval);
- return retval;
+ return new KVStreamletShadow<KeyedWindow<K>, T>(retval);
}
/**
@@ -471,7 +472,7 @@ public abstract class StreamletImpl<R> implements Streamlet<R> {
ReduceByKeyStreamlet<R, K, T> retval =
new ReduceByKeyStreamlet<>(this, keyExtractor, valueExtractor, reduceFn);
addChild(retval);
- return retval;
+ return new KVStreamletShadow<K, T>(retval);
}
/**
@@ -491,7 +492,7 @@ public abstract class StreamletImpl<R> implements Streamlet<R> {
GeneralReduceByKeyStreamlet<R, K, T> retval =
new GeneralReduceByKeyStreamlet<>(this, keyExtractor, identity, reduceFn);
addChild(retval);
- return retval;
+ return new KVStreamletShadow<K, T>(retval);
}
/**
@@ -517,7 +518,7 @@ public abstract class StreamletImpl<R> implements Streamlet<R> {
new ReduceByKeyAndWindowStreamlet<>(this, keyExtractor, valueExtractor,
windowCfg, reduceFn);
addChild(retval);
- return retval;
+ return new KVStreamletShadow<KeyedWindow<K>, T>(retval);
}
/**
@@ -546,7 +547,7 @@ public abstract class StreamletImpl<R> implements Streamlet<R> {
new GeneralReduceByKeyAndWindowStreamlet<>(this, keyExtractor, windowCfg,
identity, reduceFn);
addChild(retval);
- return retval;
+ return new KVStreamletShadow<KeyedWindow<K>, T>(retval);
}
/**
@@ -691,7 +692,7 @@ public abstract class StreamletImpl<R> implements Streamlet<R> {
KeyByStreamlet<R, K, V> retval =
new KeyByStreamlet<R, K, V>(this, keyExtractor, valueExtractor);
addChild(retval);
- return retval;
+ return new KVStreamletShadow<K, V>(retval);
}
/**
@@ -705,7 +706,7 @@ public abstract class StreamletImpl<R> implements Streamlet<R> {
CountByKeyStreamlet<R, K> retval = new CountByKeyStreamlet<>(this, keyExtractor);
addChild(retval);
- return retval;
+ return new KVStreamletShadow<K, Long>(retval);
}
@@ -725,6 +726,6 @@ public abstract class StreamletImpl<R> implements Streamlet<R> {
CountByKeyAndWindowStreamlet<R, K> retval =
new CountByKeyAndWindowStreamlet<>(this, keyExtractor, windowCfg);
addChild(retval);
- return retval;
+ return new KVStreamletShadow<KeyedWindow<K>, Long>(retval);
}
}
diff --git a/heron/api/src/java/org/apache/heron/streamlet/impl/StreamletShadow.java b/heron/api/src/java/org/apache/heron/streamlet/impl/StreamletShadow.java
index fad23de..26e7eb6 100644
--- a/heron/api/src/java/org/apache/heron/streamlet/impl/StreamletShadow.java
+++ b/heron/api/src/java/org/apache/heron/streamlet/impl/StreamletShadow.java
@@ -58,6 +58,10 @@ public class StreamletShadow<R> extends StreamletImpl<R> {
this.real = real;
}
+ public StreamletImpl<R> getReal() {
+ return real;
+ }
+
/**
* Gets the stream id of this Streamlet.
* @return the stream id of this Streamlet
@@ -72,11 +76,23 @@ public class StreamletShadow<R> extends StreamletImpl<R> {
* the real object since shadow object shouldn't have them)
*/
@Override
+ public StreamletShadow<R> setName(String sName) {
+ real.setName(sName);
+ return this;
+ }
+
+ @Override
public String getName() {
return real.getName();
}
@Override
+ public StreamletShadow<R> setNumPartitions(int numPartitions) {
+ real.setNumPartitions(numPartitions);
+ return this;
+ }
+
+ @Override
public int getNumPartitions() {
return real.getNumPartitions();
}
diff --git a/heron/api/src/java/org/apache/heron/streamlet/impl/streamlets/CountByKeyAndWindowStreamlet.java b/heron/api/src/java/org/apache/heron/streamlet/impl/streamlets/CountByKeyAndWindowStreamlet.java
index 4a19e2a..698c478 100644
--- a/heron/api/src/java/org/apache/heron/streamlet/impl/streamlets/CountByKeyAndWindowStreamlet.java
+++ b/heron/api/src/java/org/apache/heron/streamlet/impl/streamlets/CountByKeyAndWindowStreamlet.java
@@ -23,11 +23,11 @@ package org.apache.heron.streamlet.impl.streamlets;
import java.util.Set;
import org.apache.heron.api.topology.TopologyBuilder;
+import org.apache.heron.streamlet.KeyValue;
import org.apache.heron.streamlet.KeyedWindow;
import org.apache.heron.streamlet.SerializableFunction;
import org.apache.heron.streamlet.StreamletReducers;
import org.apache.heron.streamlet.WindowConfig;
-import org.apache.heron.streamlet.impl.KVStreamletImpl;
import org.apache.heron.streamlet.impl.StreamletImpl;
import org.apache.heron.streamlet.impl.groupings.ReduceByKeyAndWindowCustomGrouping;
import org.apache.heron.streamlet.impl.operators.ReduceByKeyAndWindowOperator;
@@ -39,7 +39,7 @@ import org.apache.heron.streamlet.impl.operators.ReduceByKeyAndWindowOperator;
* KeyWindowInfo<K> type and the value is Long.
*/
public class CountByKeyAndWindowStreamlet<R, K>
- extends KVStreamletImpl<KeyedWindow<K>, Long> {
+ extends StreamletImpl<KeyValue<KeyedWindow<K>, Long>> {
private StreamletImpl<R> parent;
private SerializableFunction<R, K> keyExtractor;
private WindowConfig windowCfg;
diff --git a/heron/api/src/java/org/apache/heron/streamlet/impl/streamlets/CountByKeyStreamlet.java b/heron/api/src/java/org/apache/heron/streamlet/impl/streamlets/CountByKeyStreamlet.java
index 5c8328b..f339ef2 100644
--- a/heron/api/src/java/org/apache/heron/streamlet/impl/streamlets/CountByKeyStreamlet.java
+++ b/heron/api/src/java/org/apache/heron/streamlet/impl/streamlets/CountByKeyStreamlet.java
@@ -23,9 +23,9 @@ package org.apache.heron.streamlet.impl.streamlets;
import java.util.Set;
import org.apache.heron.api.topology.TopologyBuilder;
+import org.apache.heron.streamlet.KeyValue;
import org.apache.heron.streamlet.SerializableFunction;
import org.apache.heron.streamlet.StreamletReducers;
-import org.apache.heron.streamlet.impl.KVStreamletImpl;
import org.apache.heron.streamlet.impl.StreamletImpl;
import org.apache.heron.streamlet.impl.groupings.ReduceByKeyAndWindowCustomGrouping;
import org.apache.heron.streamlet.impl.operators.ReduceByKeyOperator;
@@ -38,7 +38,7 @@ import org.apache.heron.streamlet.impl.operators.ReduceByKeyOperator;
* ReduceByKeyAndWindowStreamlet's elements are of KeyValue type where the key is
* KeyWindowInfo<K> type and the value is of type V.
*/
-public class CountByKeyStreamlet<R, K> extends KVStreamletImpl<K, Long> {
+public class CountByKeyStreamlet<R, K> extends StreamletImpl<KeyValue<K, Long>> {
private StreamletImpl<R> parent;
private SerializableFunction<R, K> keyExtractor;
diff --git a/heron/api/src/java/org/apache/heron/streamlet/impl/streamlets/GeneralReduceByKeyAndWindowStreamlet.java b/heron/api/src/java/org/apache/heron/streamlet/impl/streamlets/GeneralReduceByKeyAndWindowStreamlet.java
index c2b8747..f95d7d2 100644
--- a/heron/api/src/java/org/apache/heron/streamlet/impl/streamlets/GeneralReduceByKeyAndWindowStreamlet.java
+++ b/heron/api/src/java/org/apache/heron/streamlet/impl/streamlets/GeneralReduceByKeyAndWindowStreamlet.java
@@ -23,11 +23,11 @@ package org.apache.heron.streamlet.impl.streamlets;
import java.util.Set;
import org.apache.heron.api.topology.TopologyBuilder;
+import org.apache.heron.streamlet.KeyValue;
import org.apache.heron.streamlet.KeyedWindow;
import org.apache.heron.streamlet.SerializableBiFunction;
import org.apache.heron.streamlet.SerializableFunction;
import org.apache.heron.streamlet.WindowConfig;
-import org.apache.heron.streamlet.impl.KVStreamletImpl;
import org.apache.heron.streamlet.impl.StreamletImpl;
import org.apache.heron.streamlet.impl.groupings.ReduceByKeyAndWindowCustomGrouping;
import org.apache.heron.streamlet.impl.operators.GeneralReduceByKeyAndWindowOperator;
@@ -40,7 +40,7 @@ import org.apache.heron.streamlet.impl.operators.GeneralReduceByKeyAndWindowOper
* KeyWindowInfo<K> type and the value is of type T.
*/
public class GeneralReduceByKeyAndWindowStreamlet<R, K, T>
- extends KVStreamletImpl<KeyedWindow<K>, T> {
+ extends StreamletImpl<KeyValue<KeyedWindow<K>, T>> {
private StreamletImpl<R> parent;
private SerializableFunction<R, K> keyExtractor;
private WindowConfig windowCfg;
diff --git a/heron/api/src/java/org/apache/heron/streamlet/impl/streamlets/GeneralReduceByKeyStreamlet.java b/heron/api/src/java/org/apache/heron/streamlet/impl/streamlets/GeneralReduceByKeyStreamlet.java
index 9b4d2fe..277a1e8 100644
--- a/heron/api/src/java/org/apache/heron/streamlet/impl/streamlets/GeneralReduceByKeyStreamlet.java
+++ b/heron/api/src/java/org/apache/heron/streamlet/impl/streamlets/GeneralReduceByKeyStreamlet.java
@@ -23,9 +23,9 @@ package org.apache.heron.streamlet.impl.streamlets;
import java.util.Set;
import org.apache.heron.api.topology.TopologyBuilder;
+import org.apache.heron.streamlet.KeyValue;
import org.apache.heron.streamlet.SerializableBiFunction;
import org.apache.heron.streamlet.SerializableFunction;
-import org.apache.heron.streamlet.impl.KVStreamletImpl;
import org.apache.heron.streamlet.impl.StreamletImpl;
import org.apache.heron.streamlet.impl.groupings.ReduceByKeyAndWindowCustomGrouping;
import org.apache.heron.streamlet.impl.operators.GeneralReduceByKeyOperator;
@@ -36,7 +36,7 @@ import org.apache.heron.streamlet.impl.operators.GeneralReduceByKeyOperator;
* GeneralReduceByKeyStreamlet's elements are of KeyValue type where the key is
* KeyWindowInfo<K> type and the value is of type T.
*/
-public class GeneralReduceByKeyStreamlet<R, K, T> extends KVStreamletImpl<K, T> {
+public class GeneralReduceByKeyStreamlet<R, K, T> extends StreamletImpl<KeyValue<K, T>> {
private StreamletImpl<R> parent;
private SerializableFunction<R, K> keyExtractor;
private T identity;
diff --git a/heron/api/src/java/org/apache/heron/streamlet/impl/streamlets/JoinStreamlet.java b/heron/api/src/java/org/apache/heron/streamlet/impl/streamlets/JoinStreamlet.java
index ce03f97..40f76ef 100644
--- a/heron/api/src/java/org/apache/heron/streamlet/impl/streamlets/JoinStreamlet.java
+++ b/heron/api/src/java/org/apache/heron/streamlet/impl/streamlets/JoinStreamlet.java
@@ -24,11 +24,11 @@ import java.util.Set;
import org.apache.heron.api.topology.TopologyBuilder;
import org.apache.heron.streamlet.JoinType;
+import org.apache.heron.streamlet.KeyValue;
import org.apache.heron.streamlet.KeyedWindow;
import org.apache.heron.streamlet.SerializableBiFunction;
import org.apache.heron.streamlet.SerializableFunction;
import org.apache.heron.streamlet.WindowConfig;
-import org.apache.heron.streamlet.impl.KVStreamletImpl;
import org.apache.heron.streamlet.impl.StreamletImpl;
import org.apache.heron.streamlet.impl.groupings.JoinCustomGrouping;
import org.apache.heron.streamlet.impl.operators.JoinOperator;
@@ -40,7 +40,7 @@ import org.apache.heron.streamlet.impl.operators.JoinOperator;
* JoinStreamlet's elements are of KeyValue type where the key is KeyWindowInfo<K> type
* and the value is of type VR.
*/
-public final class JoinStreamlet<K, R, S, T> extends KVStreamletImpl<KeyedWindow<K>, T> {
+public final class JoinStreamlet<K, R, S, T> extends StreamletImpl<KeyValue<KeyedWindow<K>, T>> {
private JoinType joinType;
private StreamletImpl<R> left;
private StreamletImpl<S> right;
diff --git a/heron/api/src/java/org/apache/heron/streamlet/impl/streamlets/KeyByStreamlet.java b/heron/api/src/java/org/apache/heron/streamlet/impl/streamlets/KeyByStreamlet.java
index df76eab..1c1eb16 100644
--- a/heron/api/src/java/org/apache/heron/streamlet/impl/streamlets/KeyByStreamlet.java
+++ b/heron/api/src/java/org/apache/heron/streamlet/impl/streamlets/KeyByStreamlet.java
@@ -23,8 +23,8 @@ package org.apache.heron.streamlet.impl.streamlets;
import java.util.Set;
import org.apache.heron.api.topology.TopologyBuilder;
+import org.apache.heron.streamlet.KeyValue;
import org.apache.heron.streamlet.SerializableFunction;
-import org.apache.heron.streamlet.impl.KVStreamletImpl;
import org.apache.heron.streamlet.impl.StreamletImpl;
import org.apache.heron.streamlet.impl.operators.KeyByOperator;
@@ -32,7 +32,7 @@ import org.apache.heron.streamlet.impl.operators.KeyByOperator;
* KeyByStreamlet represents a KVStreamlet that is the result of applying key and value extractors
* on all elements.
*/
-public class KeyByStreamlet<R, K, V> extends KVStreamletImpl<K, V> {
+public class KeyByStreamlet<R, K, V> extends StreamletImpl<KeyValue<K, V>> {
private StreamletImpl<R> parent;
private SerializableFunction<R, K> keyExtractor;
private SerializableFunction<R, V> valueExtractor;
diff --git a/heron/api/src/java/org/apache/heron/streamlet/impl/streamlets/ReduceByKeyAndWindowStreamlet.java b/heron/api/src/java/org/apache/heron/streamlet/impl/streamlets/ReduceByKeyAndWindowStreamlet.java
index 2c399f3..18fea4d 100644
--- a/heron/api/src/java/org/apache/heron/streamlet/impl/streamlets/ReduceByKeyAndWindowStreamlet.java
+++ b/heron/api/src/java/org/apache/heron/streamlet/impl/streamlets/ReduceByKeyAndWindowStreamlet.java
@@ -23,11 +23,11 @@ package org.apache.heron.streamlet.impl.streamlets;
import java.util.Set;
import org.apache.heron.api.topology.TopologyBuilder;
+import org.apache.heron.streamlet.KeyValue;
import org.apache.heron.streamlet.KeyedWindow;
import org.apache.heron.streamlet.SerializableBinaryOperator;
import org.apache.heron.streamlet.SerializableFunction;
import org.apache.heron.streamlet.WindowConfig;
-import org.apache.heron.streamlet.impl.KVStreamletImpl;
import org.apache.heron.streamlet.impl.StreamletImpl;
import org.apache.heron.streamlet.impl.groupings.ReduceByKeyAndWindowCustomGrouping;
import org.apache.heron.streamlet.impl.operators.ReduceByKeyAndWindowOperator;
@@ -40,7 +40,7 @@ import org.apache.heron.streamlet.impl.operators.ReduceByKeyAndWindowOperator;
* KeyWindowInfo<K> type and the value is of type T.
*/
public class ReduceByKeyAndWindowStreamlet<R, K, T>
- extends KVStreamletImpl<KeyedWindow<K>, T> {
+ extends StreamletImpl<KeyValue<KeyedWindow<K>, T>> {
private StreamletImpl<R> parent;
private SerializableFunction<R, K> keyExtractor;
private SerializableFunction<R, T> valueExtractor;
diff --git a/heron/api/src/java/org/apache/heron/streamlet/impl/streamlets/ReduceByKeyStreamlet.java b/heron/api/src/java/org/apache/heron/streamlet/impl/streamlets/ReduceByKeyStreamlet.java
index ea84687..9176618 100644
--- a/heron/api/src/java/org/apache/heron/streamlet/impl/streamlets/ReduceByKeyStreamlet.java
+++ b/heron/api/src/java/org/apache/heron/streamlet/impl/streamlets/ReduceByKeyStreamlet.java
@@ -23,9 +23,9 @@ package org.apache.heron.streamlet.impl.streamlets;
import java.util.Set;
import org.apache.heron.api.topology.TopologyBuilder;
+import org.apache.heron.streamlet.KeyValue;
import org.apache.heron.streamlet.SerializableBinaryOperator;
import org.apache.heron.streamlet.SerializableFunction;
-import org.apache.heron.streamlet.impl.KVStreamletImpl;
import org.apache.heron.streamlet.impl.StreamletImpl;
import org.apache.heron.streamlet.impl.groupings.ReduceByKeyAndWindowCustomGrouping;
import org.apache.heron.streamlet.impl.operators.ReduceByKeyOperator;
@@ -36,7 +36,7 @@ import org.apache.heron.streamlet.impl.operators.ReduceByKeyOperator;
* ReduceByKeyAndWindowStreamlet's elements are of KeyValue type where the key is
* KeyWindowInfo<K> type and the value is of type T.
*/
-public class ReduceByKeyStreamlet<R, K, T> extends KVStreamletImpl<K, T> {
+public class ReduceByKeyStreamlet<R, K, T> extends StreamletImpl<KeyValue<K, T>> {
private StreamletImpl<R> parent;
private SerializableFunction<R, K> keyExtractor;
private SerializableFunction<R, T> valueExtractor;
diff --git a/heron/api/src/scala/org/apache/heron/streamlet/scala/impl/StreamletImpl.scala b/heron/api/src/scala/org/apache/heron/streamlet/scala/impl/StreamletImpl.scala
index ea68949..fe6b7df 100644
--- a/heron/api/src/scala/org/apache/heron/streamlet/scala/impl/StreamletImpl.scala
+++ b/heron/api/src/scala/org/apache/heron/streamlet/scala/impl/StreamletImpl.scala
@@ -35,7 +35,6 @@ import org.apache.heron.streamlet.{
WindowConfig
}
import org.apache.heron.streamlet.impl.{
- KVStreamletImpl => JavaKVStreamletImpl,
StreamletImpl => JavaStreamletImpl
}
import org.apache.heron.streamlet.impl.streamlets.SupplierStreamlet
diff --git a/heron/api/tests/java/BUILD b/heron/api/tests/java/BUILD
index d8efbdf..e38069e 100644
--- a/heron/api/tests/java/BUILD
+++ b/heron/api/tests/java/BUILD
@@ -32,6 +32,8 @@ java_tests(
"org.apache.heron.streamlet.impl.operators.KeyByOperatorTest",
"org.apache.heron.streamlet.impl.operators.ReduceByKeyAndWindowOperatorTest",
"org.apache.heron.streamlet.impl.operators.GeneralReduceByKeyAndWindowOperatorTest",
+ "org.apache.heron.streamlet.impl.streamlets.KVStreamletShadowTest",
+ "org.apache.heron.streamlet.impl.streamlets.StreamletShadowTest",
"org.apache.heron.streamlet.impl.utils.StreamletUtilsTest",
"org.apache.heron.api.ConfigTest",
"org.apache.heron.api.HeronSubmitterTest",
diff --git a/heron/api/tests/java/org/apache/heron/streamlet/impl/StreamletShadowTest.java b/heron/api/tests/java/org/apache/heron/streamlet/impl/KVStreamletShadowTest.java
similarity index 70%
copy from heron/api/tests/java/org/apache/heron/streamlet/impl/StreamletShadowTest.java
copy to heron/api/tests/java/org/apache/heron/streamlet/impl/KVStreamletShadowTest.java
index fbbd803..964959e 100644
--- a/heron/api/tests/java/org/apache/heron/streamlet/impl/StreamletShadowTest.java
+++ b/heron/api/tests/java/org/apache/heron/streamlet/impl/KVStreamletShadowTest.java
@@ -18,16 +18,16 @@
*/
package org.apache.heron.streamlet.impl.streamlets;
-import java.util.ArrayList;
-
import org.junit.Test;
import org.mockito.Mock;
+import org.apache.heron.streamlet.KeyValue;
import org.apache.heron.streamlet.impl.StreamletImpl;
import static org.junit.Assert.assertEquals;
import static org.mockito.Mockito.doNothing;
import static org.mockito.Mockito.doReturn;
+import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.never;
import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;
@@ -35,36 +35,33 @@ import static org.mockito.Mockito.verify;
/**
* Unit tests for {@link StreamletShadow}
*/
-public class StreamletShadowTest {
+public class KVStreamletShadowTest {
@Mock
- private StreamletImpl<Double> mockReal;
+ private StreamletImpl<KeyValue<String, Double>> mockReal = mock(StreamletImpl.class);
@Mock
- private StreamletImpl<Double> mockChild;
+ private StreamletImpl<Double> mockChild = mock(StreamletImpl.class);
@Test
public void testConstruction() {
doReturn("real_name").when(mockReal).getName();
doReturn(1).when(mockReal).getNumPartitions();
doNothing().when(mockReal).addChild(mockChild);
- doReturn(new ArrayList<StreamletImpl<Double>>()).when(mockReal).getChildren();
+ doReturn(mockReal).when(mockReal).setName("shadow_name");
+ doReturn(mockReal).when(mockReal).setNumPartitions(2);
doReturn("real_stream").when(mockReal).getStreamId();
- StreamletShadow<Double> shadow = new StreamletShadow(mockReal);
+ KVStreamletShadow<String, Double> shadow = new KVStreamletShadow(mockReal);
assertEquals(shadow.getName(), "real_name");
assertEquals(shadow.getNumPartitions(), 1);
- assertEquals(shadow.getStreamId(), "real_stream");
- // set a different stream id
- StreamletShadow<Double> shadow2 = new StreamletShadow(mockReal) {
- @Override
- public String getStreamId() {
- return "shadow_stream";
- }
- };
- assertEquals(shadow.getName(), "real_name");
- assertEquals(shadow.getNumPartitions(), 1);
- assertEquals(shadow.getStreamId(), "shadow_stream");
+ // Set name/partition should be applied to the real object
+ KVStreamletShadow<String, Double> shadow2 = new KVStreamletShadow(mockReal)
+ .setName("shadow_name")
+ .setNumPartitions(2);
+
+ verify(mockReal, times(1)).setName("shadow_name");
+ verify(mockReal, times(1)).setNumPartitions(2);
// addChild call should be forwarded to the real object
verify(mockReal, never()).addChild(mockChild);
diff --git a/heron/api/tests/java/org/apache/heron/streamlet/impl/StreamletImplTest.java b/heron/api/tests/java/org/apache/heron/streamlet/impl/StreamletImplTest.java
index fdb860c..345e417 100644
--- a/heron/api/tests/java/org/apache/heron/streamlet/impl/StreamletImplTest.java
+++ b/heron/api/tests/java/org/apache/heron/streamlet/impl/StreamletImplTest.java
@@ -61,6 +61,7 @@ import org.apache.heron.streamlet.impl.streamlets.FilterStreamlet;
import org.apache.heron.streamlet.impl.streamlets.FlatMapStreamlet;
import org.apache.heron.streamlet.impl.streamlets.GeneralReduceByKeyStreamlet;
import org.apache.heron.streamlet.impl.streamlets.JoinStreamlet;
+import org.apache.heron.streamlet.impl.streamlets.KVStreamletShadow;
import org.apache.heron.streamlet.impl.streamlets.KeyByStreamlet;
import org.apache.heron.streamlet.impl.streamlets.MapStreamlet;
import org.apache.heron.streamlet.impl.streamlets.ReduceByKeyAndWindowStreamlet;
@@ -363,15 +364,16 @@ public class StreamletImplTest {
@Test
public void testKeyByStreamlet() {
Streamlet<Double> baseStreamlet = builder.newSource(() -> Math.random());
- KVStreamlet<Long, Double> kvStream = baseStreamlet.keyBy(x -> Math.round(x));
+ KVStreamlet<Long, Double> streamlet = baseStreamlet.keyBy(x -> Math.round(x));
- assertTrue(kvStream instanceof KeyByStreamlet);
- KeyByStreamlet<Double, Long, Double> mStreamlet =
- (KeyByStreamlet<Double, Long, Double>) kvStream;
+ assertTrue(streamlet instanceof KVStreamletShadow);
+ KVStreamletShadow<Long, Double> mStreamlet =
+ (KVStreamletShadow<Long, Double>) streamlet;
+ assertTrue(mStreamlet.getReal() instanceof KeyByStreamlet);
assertEquals(1, mStreamlet.getNumPartitions());
SupplierStreamlet<Double> supplierStreamlet = (SupplierStreamlet<Double>) baseStreamlet;
assertEquals(supplierStreamlet.getChildren().size(), 1);
- assertEquals(supplierStreamlet.getChildren().get(0), kvStream);
+ assertEquals(supplierStreamlet.getChildren().get(0), mStreamlet.getReal());
}
@Test
@@ -383,13 +385,14 @@ public class StreamletImplTest {
x -> x,
StreamletReducers::sum);
- assertTrue(streamlet instanceof ReduceByKeyStreamlet);
- ReduceByKeyStreamlet<Double, String, Double> mStreamlet =
- (ReduceByKeyStreamlet<Double, String, Double>) streamlet;
+ assertTrue(streamlet instanceof KVStreamletShadow);
+ KVStreamletShadow<String, Double> mStreamlet =
+ (KVStreamletShadow<String, Double>) streamlet;
+ assertTrue(mStreamlet.getReal() instanceof ReduceByKeyStreamlet);
assertEquals(20, mStreamlet.getNumPartitions());
SupplierStreamlet<Double> supplierStreamlet = (SupplierStreamlet<Double>) baseStreamlet;
assertEquals(supplierStreamlet.getChildren().size(), 1);
- assertEquals(supplierStreamlet.getChildren().get(0), streamlet);
+ assertEquals(supplierStreamlet.getChildren().get(0), mStreamlet.getReal());
}
@Test
@@ -401,13 +404,14 @@ public class StreamletImplTest {
0.0,
StreamletReducers::sum);
- assertTrue(streamlet instanceof GeneralReduceByKeyStreamlet);
- GeneralReduceByKeyStreamlet<Double, String, Double> mStreamlet =
- (GeneralReduceByKeyStreamlet<Double, String, Double>) streamlet;
+ assertTrue(streamlet instanceof KVStreamletShadow);
+ KVStreamletShadow<String, Double> mStreamlet =
+ (KVStreamletShadow<String, Double>) streamlet;
+ assertTrue(mStreamlet.getReal() instanceof GeneralReduceByKeyStreamlet);
assertEquals(20, mStreamlet.getNumPartitions());
SupplierStreamlet<Double> supplierStreamlet = (SupplierStreamlet<Double>) baseStreamlet;
assertEquals(supplierStreamlet.getChildren().size(), 1);
- assertEquals(supplierStreamlet.getChildren().get(0), streamlet);
+ assertEquals(supplierStreamlet.getChildren().get(0), mStreamlet.getReal());
}
@Test
@@ -417,13 +421,14 @@ public class StreamletImplTest {
KVStreamlet<String, Long> streamlet = baseStreamlet.setNumPartitions(20)
.countByKey(x -> (x > 0) ? "positive" : ((x < 0) ? "negative" : "zero"));
- assertTrue(streamlet instanceof CountByKeyStreamlet);
- CountByKeyStreamlet<Double, String> mStreamlet =
- (CountByKeyStreamlet<Double, String>) streamlet;
+ assertTrue(streamlet instanceof KVStreamletShadow);
+ KVStreamletShadow<String, Long> mStreamlet =
+ (KVStreamletShadow<String, Long>) streamlet;
+ assertTrue(mStreamlet.getReal() instanceof CountByKeyStreamlet);
assertEquals(20, mStreamlet.getNumPartitions());
SupplierStreamlet<Double> supplierStreamlet = (SupplierStreamlet<Double>) baseStreamlet;
assertEquals(supplierStreamlet.getChildren().size(), 1);
- assertEquals(supplierStreamlet.getChildren().get(0), streamlet);
+ assertEquals(supplierStreamlet.getChildren().get(0), mStreamlet.getReal());
}
@Test
@@ -434,13 +439,14 @@ public class StreamletImplTest {
.countByKeyAndWindow(x -> (x > 0) ? "positive" : ((x < 0) ? "negative" : "zero"),
WindowConfig.TumblingCountWindow(10));
- assertTrue(streamlet instanceof CountByKeyAndWindowStreamlet);
- CountByKeyAndWindowStreamlet<Double, String> mStreamlet =
- (CountByKeyAndWindowStreamlet<Double, String>) streamlet;
+ assertTrue(streamlet instanceof KVStreamletShadow);
+ KVStreamletShadow<KeyedWindow<String>, Long> mStreamlet =
+ (KVStreamletShadow<KeyedWindow<String>, Long>) streamlet;
+ assertTrue(mStreamlet.getReal() instanceof CountByKeyAndWindowStreamlet);
assertEquals(20, mStreamlet.getNumPartitions());
SupplierStreamlet<Double> supplierStreamlet = (SupplierStreamlet<Double>) baseStreamlet;
assertEquals(supplierStreamlet.getChildren().size(), 1);
- assertEquals(supplierStreamlet.getChildren().get(0), streamlet);
+ assertEquals(supplierStreamlet.getChildren().get(0), mStreamlet.getReal());
}
@Test
diff --git a/heron/api/tests/java/org/apache/heron/streamlet/impl/StreamletShadowTest.java b/heron/api/tests/java/org/apache/heron/streamlet/impl/StreamletShadowTest.java
index fbbd803..fe7ecbe 100644
--- a/heron/api/tests/java/org/apache/heron/streamlet/impl/StreamletShadowTest.java
+++ b/heron/api/tests/java/org/apache/heron/streamlet/impl/StreamletShadowTest.java
@@ -18,8 +18,6 @@
*/
package org.apache.heron.streamlet.impl.streamlets;
-import java.util.ArrayList;
-
import org.junit.Test;
import org.mockito.Mock;
@@ -28,6 +26,7 @@ import org.apache.heron.streamlet.impl.StreamletImpl;
import static org.junit.Assert.assertEquals;
import static org.mockito.Mockito.doNothing;
import static org.mockito.Mockito.doReturn;
+import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.never;
import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;
@@ -37,17 +36,18 @@ import static org.mockito.Mockito.verify;
*/
public class StreamletShadowTest {
@Mock
- private StreamletImpl<Double> mockReal;
+ private StreamletImpl<Double> mockReal = mock(StreamletImpl.class);
@Mock
- private StreamletImpl<Double> mockChild;
+ private StreamletImpl<Double> mockChild = mock(StreamletImpl.class);
@Test
public void testConstruction() {
doReturn("real_name").when(mockReal).getName();
doReturn(1).when(mockReal).getNumPartitions();
doNothing().when(mockReal).addChild(mockChild);
- doReturn(new ArrayList<StreamletImpl<Double>>()).when(mockReal).getChildren();
+ doReturn(mockReal).when(mockReal).setName("shadow_name");
+ doReturn(mockReal).when(mockReal).setNumPartitions(2);
doReturn("real_stream").when(mockReal).getStreamId();
StreamletShadow<Double> shadow = new StreamletShadow(mockReal);
@@ -62,9 +62,14 @@ public class StreamletShadowTest {
return "shadow_stream";
}
};
- assertEquals(shadow.getName(), "real_name");
- assertEquals(shadow.getNumPartitions(), 1);
- assertEquals(shadow.getStreamId(), "shadow_stream");
+ assertEquals(shadow2.getName(), "real_name");
+ assertEquals(shadow2.getNumPartitions(), 1);
+ assertEquals(shadow2.getStreamId(), "shadow_stream");
+
+ // Set name/partition should be applied to the real object
+ shadow.setName("shadow_name").setNumPartitions(2);
+ verify(mockReal, times(1)).setName("shadow_name");
+ verify(mockReal, times(1)).setNumPartitions(2);
// addChild call should be forwarded to the real object
verify(mockReal, never()).addChild(mockChild);