You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by ke...@apache.org on 2016/10/17 16:51:00 UTC
[08/11] incubator-beam git commit: BEAM-261 Support multiple side
inputs.
BEAM-261 Support multiple side inputs.
Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/1ec7cd91
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/1ec7cd91
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/1ec7cd91
Branch: refs/heads/apex-runner
Commit: 1ec7cd9129fc31ece7554e2ea18535ce15e46bcf
Parents: fd7f46c
Author: Thomas Weise <th...@apache.org>
Authored: Thu Oct 13 14:38:06 2016 -0700
Committer: Thomas Weise <th...@apache.org>
Committed: Mon Oct 17 09:22:49 2016 -0700
----------------------------------------------------------------------
.../runners/apex/ApexPipelineTranslator.java | 19 ++++++-
.../apache/beam/runners/apex/ApexRunner.java | 7 ++-
.../beam/runners/apex/ApexRunnerResult.java | 7 +++
.../FlattenPCollectionTranslator.java | 38 +++++++++++---
.../translators/ParDoBoundMultiTranslator.java | 55 +++++++++++++++++---
.../apex/translators/ParDoBoundTranslator.java | 14 +----
.../functions/ApexFlattenOperator.java | 11 ++++
.../functions/ApexParDoOperator.java | 13 +++--
.../apex/translators/utils/ApexStreamTuple.java | 22 ++++++--
9 files changed, 148 insertions(+), 38 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/1ec7cd91/runners/apex/src/main/java/org/apache/beam/runners/apex/ApexPipelineTranslator.java
----------------------------------------------------------------------
diff --git a/runners/apex/src/main/java/org/apache/beam/runners/apex/ApexPipelineTranslator.java b/runners/apex/src/main/java/org/apache/beam/runners/apex/ApexPipelineTranslator.java
index 40edfb1..a16f551 100644
--- a/runners/apex/src/main/java/org/apache/beam/runners/apex/ApexPipelineTranslator.java
+++ b/runners/apex/src/main/java/org/apache/beam/runners/apex/ApexPipelineTranslator.java
@@ -37,6 +37,7 @@ import org.apache.beam.sdk.transforms.Flatten;
import org.apache.beam.sdk.transforms.GroupByKey;
import org.apache.beam.sdk.transforms.PTransform;
import org.apache.beam.sdk.transforms.ParDo;
+import org.apache.beam.sdk.transforms.View.CreatePCollectionView;
import org.apache.beam.sdk.values.PCollectionView;
import org.apache.beam.sdk.values.PValue;
import org.slf4j.Logger;
@@ -74,7 +75,8 @@ public class ApexPipelineTranslator implements Pipeline.PipelineVisitor {
registerTransformTranslator(Flatten.FlattenPCollectionList.class,
new FlattenPCollectionTranslator());
registerTransformTranslator(Create.Values.class, new CreateValuesTranslator());
- registerTransformTranslator(CreateApexPCollectionView.class, new CreatePCollectionViewTranslator());
+ registerTransformTranslator(CreateApexPCollectionView.class, new CreateApexPCollectionViewTranslator());
+ registerTransformTranslator(CreatePCollectionView.class, new CreatePCollectionViewTranslator());
}
public ApexPipelineTranslator(TranslationContext translationContext) {
@@ -151,7 +153,7 @@ public class ApexPipelineTranslator implements Pipeline.PipelineVisitor {
}
- private static class CreatePCollectionViewTranslator<ElemT, ViewT> implements TransformTranslator<CreateApexPCollectionView<ElemT, ViewT>>
+ private static class CreateApexPCollectionViewTranslator<ElemT, ViewT> implements TransformTranslator<CreateApexPCollectionView<ElemT, ViewT>>
{
private static final long serialVersionUID = 1L;
@@ -164,4 +166,17 @@ public class ApexPipelineTranslator implements Pipeline.PipelineVisitor {
}
}
+ private static class CreatePCollectionViewTranslator<ElemT, ViewT> implements TransformTranslator<CreatePCollectionView<ElemT, ViewT>>
+ {
+ private static final long serialVersionUID = 1L;
+
+ @Override
+ public void translate(CreatePCollectionView<ElemT, ViewT> transform, TranslationContext context)
+ {
+ PCollectionView<ViewT> view = transform.getView();
+ context.addView(view);
+ LOG.debug("view {}", view.getName());
+ }
+ }
+
}
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/1ec7cd91/runners/apex/src/main/java/org/apache/beam/runners/apex/ApexRunner.java
----------------------------------------------------------------------
diff --git a/runners/apex/src/main/java/org/apache/beam/runners/apex/ApexRunner.java b/runners/apex/src/main/java/org/apache/beam/runners/apex/ApexRunner.java
index ad49f08..667f1c8 100644
--- a/runners/apex/src/main/java/org/apache/beam/runners/apex/ApexRunner.java
+++ b/runners/apex/src/main/java/org/apache/beam/runners/apex/ApexRunner.java
@@ -74,7 +74,7 @@ public class ApexRunner extends PipelineRunner<ApexRunnerResult> {
/**
* TODO: this isn't thread safe and may cause issues when tests run in parallel
* Holds any most resent assertion error that was raised while processing elements.
- * Used in the unit test driver in embedded to propagate the exception.
+ * Used in the unit test driver in embedded mode to propagate the exception.
*/
public static volatile AssertionError assertionError;
@@ -100,6 +100,8 @@ public class ApexRunner extends PipelineRunner<ApexRunnerResult> {
WindowingStrategy.globalDefault(),
PCollection.IsBounded.BOUNDED);
// TODO: replace this with a mapping
+////
+
} else if (Combine.GloballyAsSingletonView.class.equals(transform.getClass())) {
PTransform<InputT, OutputT> customTransform = (PTransform)new StreamingCombineGloballyAsSingletonView<InputT, OutputT>(this,
(Combine.GloballyAsSingletonView)transform);
@@ -109,6 +111,7 @@ public class ApexRunner extends PipelineRunner<ApexRunnerResult> {
PTransform<InputT, OutputT> customTransform = (PTransform)new StreamingViewAsSingleton<InputT>(this,
(View.AsSingleton)transform);
return Pipeline.applyTransform(input, customTransform);
+/*
} else if (View.AsIterable.class.equals(transform.getClass())) {
PTransform<InputT, OutputT> customTransform = (PTransform)new StreamingViewAsIterable<InputT>(this,
(View.AsIterable)transform);
@@ -125,6 +128,8 @@ public class ApexRunner extends PipelineRunner<ApexRunnerResult> {
PTransform<InputT, OutputT> customTransform = new StreamingViewAsMultimap(this,
(View.AsMultimap)transform);
return Pipeline.applyTransform(input, customTransform);
+*/
+////
} else {
return super.apply(transform, input);
}
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/1ec7cd91/runners/apex/src/main/java/org/apache/beam/runners/apex/ApexRunnerResult.java
----------------------------------------------------------------------
diff --git a/runners/apex/src/main/java/org/apache/beam/runners/apex/ApexRunnerResult.java b/runners/apex/src/main/java/org/apache/beam/runners/apex/ApexRunnerResult.java
index f28c8dc..6817684 100644
--- a/runners/apex/src/main/java/org/apache/beam/runners/apex/ApexRunnerResult.java
+++ b/runners/apex/src/main/java/org/apache/beam/runners/apex/ApexRunnerResult.java
@@ -19,6 +19,7 @@ package org.apache.beam.runners.apex;
import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.PipelineResult;
+import org.apache.beam.sdk.metrics.MetricResults;
import java.io.IOException;
@@ -74,6 +75,12 @@ public class ApexRunnerResult implements PipelineResult {
throw new UnsupportedOperationException();
}
+ @Override
+ public MetricResults metrics()
+ {
+ throw new UnsupportedOperationException();
+ }
+
/**
* Return the DAG executed by the pipeline.
* @return
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/1ec7cd91/runners/apex/src/main/java/org/apache/beam/runners/apex/translators/FlattenPCollectionTranslator.java
----------------------------------------------------------------------
diff --git a/runners/apex/src/main/java/org/apache/beam/runners/apex/translators/FlattenPCollectionTranslator.java b/runners/apex/src/main/java/org/apache/beam/runners/apex/translators/FlattenPCollectionTranslator.java
index 90ab81f..6737767 100644
--- a/runners/apex/src/main/java/org/apache/beam/runners/apex/translators/FlattenPCollectionTranslator.java
+++ b/runners/apex/src/main/java/org/apache/beam/runners/apex/translators/FlattenPCollectionTranslator.java
@@ -20,6 +20,7 @@ package org.apache.beam.runners.apex.translators;
import java.util.Collections;
import java.util.List;
+import java.util.Map;
import org.apache.beam.runners.apex.translators.functions.ApexFlattenOperator;
import org.apache.beam.runners.apex.translators.io.ApexReadUnboundedInputOperator;
@@ -53,9 +54,25 @@ public class FlattenPCollectionTranslator<T> implements
ApexReadUnboundedInputOperator<T, ?> operator = new ApexReadUnboundedInputOperator<>(
unboundedSource, context.getPipelineOptions());
context.addOperator(operator, operator.output);
- return;
+ } else {
+ PCollection<T> output = context.getOutput();
+ Map<PCollection<?>, Integer> unionTags = Collections.emptyMap();
+ flattenCollections(collections, unionTags, output, context);
}
+ }
+ /**
+ * Flatten the given collections into the given result collection. Translates
+ * into a cascading merge with 2 input ports per operator. The optional union
+ * tags can be used to identify the source in the result stream, used to
+ * channel multiple side inputs to a single Apex operator port.
+ *
+ * @param collections
+ * @param unionTags
+ * @param finalCollection
+ * @param context
+ */
+ static <T> void flattenCollections(List<PCollection<T>> collections, Map<PCollection<?>, Integer> unionTags, PCollection<T> finalCollection, TranslationContext context) {
List<PCollection<T>> remainingCollections = Lists.newArrayList();
PCollection<T> firstCollection = null;
while (!collections.isEmpty()) {
@@ -65,14 +82,23 @@ public class FlattenPCollectionTranslator<T> implements
} else {
ApexFlattenOperator<T> operator = new ApexFlattenOperator<>();
context.addStream(firstCollection, operator.data1);
+ Integer unionTag = unionTags.get(firstCollection);
+ operator.data1Tag = (unionTag != null) ? unionTag : 0;
context.addStream(collection, operator.data2);
+ unionTag = unionTags.get(collection);
+ operator.data2Tag = (unionTag != null) ? unionTag : 0;
+
+ if (!collection.getCoder().equals(firstCollection.getCoder())) {
+ throw new UnsupportedOperationException("coders don't match");
+ }
+
if (collections.size() > 2) {
- PCollection<T> resultCollection = intermediateCollection(collection, collection.getCoder());
- context.addOperator(operator, operator.out, resultCollection);
- remainingCollections.add(resultCollection);
+ PCollection<T> intermediateCollection = intermediateCollection(collection, collection.getCoder());
+ context.addOperator(operator, operator.out, intermediateCollection);
+ remainingCollections.add(intermediateCollection);
} else {
// final stream merge
- context.addOperator(operator, operator.out);
+ context.addOperator(operator, operator.out, finalCollection);
}
firstCollection = null;
}
@@ -91,7 +117,7 @@ public class FlattenPCollectionTranslator<T> implements
}
}
- public static <T> PCollection<T> intermediateCollection(PCollection<T> input, Coder<T> outputCoder) {
+ static <T> PCollection<T> intermediateCollection(PCollection<T> input, Coder<T> outputCoder) {
PCollection<T> output = PCollection.createPrimitiveOutputInternal(input.getPipeline(), input.getWindowingStrategy(), input.isBounded());
output.setCoder(outputCoder);
return output;
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/1ec7cd91/runners/apex/src/main/java/org/apache/beam/runners/apex/translators/ParDoBoundMultiTranslator.java
----------------------------------------------------------------------
diff --git a/runners/apex/src/main/java/org/apache/beam/runners/apex/translators/ParDoBoundMultiTranslator.java b/runners/apex/src/main/java/org/apache/beam/runners/apex/translators/ParDoBoundMultiTranslator.java
index 9c5f2b5..a229a81 100644
--- a/runners/apex/src/main/java/org/apache/beam/runners/apex/translators/ParDoBoundMultiTranslator.java
+++ b/runners/apex/src/main/java/org/apache/beam/runners/apex/translators/ParDoBoundMultiTranslator.java
@@ -18,6 +18,10 @@
package org.apache.beam.runners.apex.translators;
+import static com.google.common.base.Preconditions.checkArgument;
+
+import java.util.ArrayList;
+import java.util.HashMap;
import java.util.List;
import java.util.Map;
@@ -63,20 +67,55 @@ public class ParDoBoundMultiTranslator<InputT, OutputT> implements TransformTran
ports.put(outputEntry.getValue(), operator.sideOutputPorts[i++]);
}
context.addOperator(operator, ports);
-
context.addStream(context.getInput(), operator.input);
if (!sideInputs.isEmpty()) {
- Operator.InputPort<?>[] sideInputPorts = {operator.sideInput1};
- for (i=0; i<sideInputs.size(); i++) {
+ addSideInputs(operator, sideInputs, context);
+ }
+ }
+
+ static void addSideInputs(ApexParDoOperator<?, ?> operator, List<PCollectionView<?>> sideInputs, TranslationContext context) {
+ Operator.InputPort<?>[] sideInputPorts = {operator.sideInput1};
+ if (sideInputs.size() > sideInputPorts.length) {
+ // String msg = String.format("Too many side inputs in %s (currently only supporting %s).",
+ // transform.toString(), sideInputPorts.length);
+ // throw new UnsupportedOperationException(msg);
+ PCollection<?> unionCollection = unionSideInputs(sideInputs, context);
+ context.addStream(unionCollection, sideInputPorts[0]);
+ } else {
+ for (int i=0; i<sideInputs.size(); i++) {
// the number of input ports for side inputs are fixed and each port can only take one input.
// more (optional) ports can be added to give reasonable capacity or an explicit union operation introduced.
- if (i == sideInputPorts.length) {
- String msg = String.format("Too many side inputs in %s (currently only supporting %s).",
- transform.toString(), sideInputPorts.length);
- throw new UnsupportedOperationException(msg);
- }
context.addStream(context.getViewInput(sideInputs.get(i)), sideInputPorts[i]);
}
}
}
+
+ private static PCollection<?> unionSideInputs(List<PCollectionView<?>> sideInputs, TranslationContext context) {
+ checkArgument(sideInputs.size() > 1, "requires multiple side inputs");
+ // flatten and assign union tag
+ List<PCollection<Object>> sourceCollections = new ArrayList<>();
+ Map<PCollection<?>, Integer> unionTags = new HashMap<>();
+ PCollection<Object> firstSideInput = context.getViewInput(sideInputs.get(0));
+ for (int i=0; i < sideInputs.size(); i++) {
+ PCollectionView<?> sideInput = sideInputs.get(i);
+ PCollection<?> sideInputCollection = context.getViewInput(sideInput);
+ if (!sideInputCollection.getWindowingStrategy().equals(firstSideInput.getWindowingStrategy())) {
+ // TODO: check how to handle this in stream codec
+ //String msg = "Multiple side inputs with different window strategies.";
+ //throw new UnsupportedOperationException(msg);
+ }
+ if (!sideInputCollection.getCoder().equals(firstSideInput.getCoder())) {
+ String msg = "Multiple side inputs with different coders.";
+ throw new UnsupportedOperationException(msg);
+ }
+ sourceCollections.add(context.<PCollection<Object>>getViewInput(sideInput));
+ unionTags.put(sideInputCollection, i);
+ }
+
+ PCollection<Object> resultCollection = FlattenPCollectionTranslator.intermediateCollection(firstSideInput, firstSideInput.getCoder());
+ FlattenPCollectionTranslator.flattenCollections(sourceCollections, unionTags, resultCollection, context);
+ return resultCollection;
+
+ }
+
}
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/1ec7cd91/runners/apex/src/main/java/org/apache/beam/runners/apex/translators/ParDoBoundTranslator.java
----------------------------------------------------------------------
diff --git a/runners/apex/src/main/java/org/apache/beam/runners/apex/translators/ParDoBoundTranslator.java b/runners/apex/src/main/java/org/apache/beam/runners/apex/translators/ParDoBoundTranslator.java
index 8a7dd4b..7749a06 100644
--- a/runners/apex/src/main/java/org/apache/beam/runners/apex/translators/ParDoBoundTranslator.java
+++ b/runners/apex/src/main/java/org/apache/beam/runners/apex/translators/ParDoBoundTranslator.java
@@ -32,8 +32,6 @@ import org.apache.beam.sdk.values.PCollectionView;
import org.apache.beam.sdk.values.TupleTag;
import org.apache.beam.sdk.values.TupleTagList;
-import com.datatorrent.api.Operator;
-
/**
* {@link ParDo.Bound} is translated to Apex operator that wraps the {@link DoFn}
*/
@@ -57,17 +55,7 @@ public class ParDoBoundTranslator<InputT, OutputT> implements
context.addOperator(operator, operator.output);
context.addStream(context.getInput(), operator.input);
if (!sideInputs.isEmpty()) {
- Operator.InputPort<?>[] sideInputPorts = {operator.sideInput1};
- for (int i=0; i<sideInputs.size(); i++) {
- // the number of input ports for side inputs are fixed and each port can only take one input.
- // more (optional) ports can be added to give reasonable capacity or an explicit union operation introduced.
- if (i == sideInputPorts.length) {
- String msg = String.format("Too many side inputs in %s (currently only supporting %s).",
- transform.toString(), sideInputPorts.length);
- throw new UnsupportedOperationException(msg);
- }
- context.addStream(context.getViewInput(sideInputs.get(i)), sideInputPorts[i]);
- }
+ ParDoBoundMultiTranslator.addSideInputs(operator, sideInputs, context);
}
}
}
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/1ec7cd91/runners/apex/src/main/java/org/apache/beam/runners/apex/translators/functions/ApexFlattenOperator.java
----------------------------------------------------------------------
diff --git a/runners/apex/src/main/java/org/apache/beam/runners/apex/translators/functions/ApexFlattenOperator.java b/runners/apex/src/main/java/org/apache/beam/runners/apex/translators/functions/ApexFlattenOperator.java
index 4675244..202f2d3 100644
--- a/runners/apex/src/main/java/org/apache/beam/runners/apex/translators/functions/ApexFlattenOperator.java
+++ b/runners/apex/src/main/java/org/apache/beam/runners/apex/translators/functions/ApexFlattenOperator.java
@@ -41,6 +41,9 @@ public class ApexFlattenOperator<InputT> extends BaseOperator
private long inputWM2;
private long outputWM;
+ public int data1Tag;
+ public int data2Tag;
+
/**
* Data input port 1.
*/
@@ -70,6 +73,10 @@ public class ApexFlattenOperator<InputT> extends BaseOperator
if (traceTuples) {
LOG.debug("\nemitting {}\n", tuple);
}
+
+ if (data1Tag > 0 && tuple instanceof ApexStreamTuple.DataTuple) {
+ ((ApexStreamTuple.DataTuple<?>)tuple).setUnionTag(data1Tag);
+ }
out.emit(tuple);
}
};
@@ -103,6 +110,10 @@ public class ApexFlattenOperator<InputT> extends BaseOperator
if (traceTuples) {
LOG.debug("\nemitting {}\n", tuple);
}
+
+ if (data2Tag > 0 && tuple instanceof ApexStreamTuple.DataTuple) {
+ ((ApexStreamTuple.DataTuple<?>)tuple).setUnionTag(data2Tag);
+ }
out.emit(tuple);
}
};
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/1ec7cd91/runners/apex/src/main/java/org/apache/beam/runners/apex/translators/functions/ApexParDoOperator.java
----------------------------------------------------------------------
diff --git a/runners/apex/src/main/java/org/apache/beam/runners/apex/translators/functions/ApexParDoOperator.java b/runners/apex/src/main/java/org/apache/beam/runners/apex/translators/functions/ApexParDoOperator.java
index a951ca7..96be11d 100644
--- a/runners/apex/src/main/java/org/apache/beam/runners/apex/translators/functions/ApexParDoOperator.java
+++ b/runners/apex/src/main/java/org/apache/beam/runners/apex/translators/functions/ApexParDoOperator.java
@@ -34,7 +34,6 @@ import org.apache.beam.runners.core.SideInputHandler;
import org.apache.beam.runners.core.DoFnRunners.OutputManager;
import org.apache.beam.sdk.coders.Coder;
import org.apache.beam.sdk.coders.ListCoder;
-import org.apache.beam.sdk.repackaged.com.google.common.base.Throwables;
import org.apache.beam.sdk.transforms.Aggregator;
import org.apache.beam.sdk.transforms.Aggregator.AggregatorFactory;
import org.apache.beam.sdk.transforms.Combine.CombineFn;
@@ -60,6 +59,7 @@ import com.datatorrent.api.annotation.InputPortFieldAnnotation;
import com.datatorrent.api.annotation.OutputPortFieldAnnotation;
import com.datatorrent.common.util.BaseOperator;
import com.esotericsoftware.kryo.serializers.FieldSerializer.Bind;
+import com.google.common.base.Throwables;
import com.google.common.collect.Iterables;
import com.google.common.collect.Maps;
import com.esotericsoftware.kryo.serializers.JavaSerializer;
@@ -158,8 +158,6 @@ private transient StateInternals<Void> sideInputStateInternals = InMemoryStateIn
@InputPortFieldAnnotation(optional=true)
public final transient DefaultInputPort<ApexStreamTuple<WindowedValue<Iterable<?>>>> sideInput1 = new DefaultInputPort<ApexStreamTuple<WindowedValue<Iterable<?>>>>()
{
- private final int sideInputIndex = 0;
-
@Override
public void process(ApexStreamTuple<WindowedValue<Iterable<?>>> t)
{
@@ -167,9 +165,16 @@ private transient StateInternals<Void> sideInputStateInternals = InMemoryStateIn
// ignore side input watermarks
return;
}
+
+ int sideInputIndex = 0;
+ if (t instanceof ApexStreamTuple.DataTuple) {
+ sideInputIndex = ((ApexStreamTuple.DataTuple<?>)t).getUnionTag();
+ }
+
if (traceTuples) {
- LOG.debug("\nsideInput {}\n", t.getValue());
+ LOG.debug("\nsideInput {} {}\n", sideInputIndex, t.getValue());
}
+
PCollectionView<?> sideInput = sideInputs.get(sideInputIndex);
sideInputHandler.addSideInputValue(sideInput, t.getValue());
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/1ec7cd91/runners/apex/src/main/java/org/apache/beam/runners/apex/translators/utils/ApexStreamTuple.java
----------------------------------------------------------------------
diff --git a/runners/apex/src/main/java/org/apache/beam/runners/apex/translators/utils/ApexStreamTuple.java b/runners/apex/src/main/java/org/apache/beam/runners/apex/translators/utils/ApexStreamTuple.java
index 06940aa..c9bf6dc 100644
--- a/runners/apex/src/main/java/org/apache/beam/runners/apex/translators/utils/ApexStreamTuple.java
+++ b/runners/apex/src/main/java/org/apache/beam/runners/apex/translators/utils/ApexStreamTuple.java
@@ -50,15 +50,17 @@ public interface ApexStreamTuple<T>
*/
class DataTuple<T> implements ApexStreamTuple<T>
{
+ private int unionTag;
private T value;
public static <T> DataTuple<T> of(T value) {
- return new DataTuple<>(value);
+ return new DataTuple<>(value, 0);
}
- private DataTuple(T value)
+ private DataTuple(T value, int unionTag)
{
this.value = value;
+ this.unionTag = unionTag;
}
@Override
@@ -72,6 +74,16 @@ public interface ApexStreamTuple<T>
this.value = value;
}
+ public int getUnionTag()
+ {
+ return unionTag;
+ }
+
+ public void setUnionTag(int unionTag)
+ {
+ this.unionTag = unionTag;
+ }
+
@Override
public String toString()
{
@@ -91,7 +103,7 @@ public interface ApexStreamTuple<T>
public TimestampedTuple(long timestamp, T value)
{
- super(value);
+ super(value, 0);
this.timestamp = timestamp;
}
@@ -152,6 +164,7 @@ public interface ApexStreamTuple<T>
new DataOutputStream(outStream).writeLong(((WatermarkTuple<?>)value).getTimestamp());
} else {
outStream.write(0);
+ outStream.write(((DataTuple<?>)value).unionTag);
valueCoder.encode(value.getValue(), outStream, context);
}
}
@@ -164,7 +177,8 @@ public interface ApexStreamTuple<T>
if (b == 1) {
return new WatermarkTuple<T>(new DataInputStream(inStream).readLong());
} else {
- return new DataTuple<T>(valueCoder.decode(inStream, context));
+ int unionTag = inStream.read();
+ return new DataTuple<T>(valueCoder.decode(inStream, context), unionTag);
}
}