You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by lc...@apache.org on 2017/05/09 04:21:26 UTC
[06/13] beam git commit: Remove contexts from coders where they'll
never be used.
Remove contexts from coders where they'll never be used.
Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/996dce37
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/996dce37
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/996dce37
Branch: refs/heads/master
Commit: 996dce37b76b103f104328b7caa65f73a1bcb15a
Parents: 27e9a06
Author: Robert Bradshaw <ro...@gmail.com>
Authored: Fri May 5 16:36:47 2017 -0700
Committer: Luke Cwik <lc...@google.com>
Committed: Mon May 8 20:17:56 2017 -0700
----------------------------------------------------------------------
.../UnboundedReadFromBoundedSource.java | 4 +--
.../core/ElementAndRestrictionCoder.java | 4 +--
.../beam/runners/core/KeyedWorkItemCoder.java | 4 +--
.../beam/runners/core/TimerInternals.java | 4 +--
.../apache/beam/sdk/coders/DurationCoder.java | 4 +--
.../apache/beam/sdk/coders/InstantCoder.java | 4 +--
.../sdk/transforms/ApproximateQuantiles.java | 20 +++++-------
.../org/apache/beam/sdk/transforms/Mean.java | 4 +--
.../org/apache/beam/sdk/transforms/Top.java | 4 +--
.../beam/sdk/transforms/join/CoGbkResult.java | 10 ++----
.../transforms/windowing/IntervalWindow.java | 4 +--
.../beam/sdk/values/TimestampedValue.java | 4 +--
.../beam/sdk/io/kinesis/KinesisRecordCoder.java | 34 +++++++++-----------
13 files changed, 47 insertions(+), 57 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/beam/blob/996dce37/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/UnboundedReadFromBoundedSource.java
----------------------------------------------------------------------
diff --git a/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/UnboundedReadFromBoundedSource.java b/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/UnboundedReadFromBoundedSource.java
index ae28e3a..b74da80 100644
--- a/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/UnboundedReadFromBoundedSource.java
+++ b/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/UnboundedReadFromBoundedSource.java
@@ -224,7 +224,7 @@ public class UnboundedReadFromBoundedSource<T> extends PTransform<PBegin, PColle
public void encode(Checkpoint<T> value, OutputStream outStream, Context context)
throws CoderException, IOException {
elemsCoder.encode(value.residualElements, outStream);
- sourceCoder.encode(value.residualSource, outStream, context);
+ sourceCoder.encode(value.residualSource, outStream);
}
@SuppressWarnings("unchecked")
@@ -233,7 +233,7 @@ public class UnboundedReadFromBoundedSource<T> extends PTransform<PBegin, PColle
throws CoderException, IOException {
return new Checkpoint<>(
elemsCoder.decode(inStream),
- sourceCoder.decode(inStream, context));
+ sourceCoder.decode(inStream));
}
@Override
http://git-wip-us.apache.org/repos/asf/beam/blob/996dce37/runners/core-java/src/main/java/org/apache/beam/runners/core/ElementAndRestrictionCoder.java
----------------------------------------------------------------------
diff --git a/runners/core-java/src/main/java/org/apache/beam/runners/core/ElementAndRestrictionCoder.java b/runners/core-java/src/main/java/org/apache/beam/runners/core/ElementAndRestrictionCoder.java
index 5ddd865..fcb1deb 100644
--- a/runners/core-java/src/main/java/org/apache/beam/runners/core/ElementAndRestrictionCoder.java
+++ b/runners/core-java/src/main/java/org/apache/beam/runners/core/ElementAndRestrictionCoder.java
@@ -56,14 +56,14 @@ public class ElementAndRestrictionCoder<ElementT, RestrictionT>
throw new CoderException("cannot encode a null ElementAndRestriction");
}
elementCoder.encode(value.element(), outStream);
- restrictionCoder.encode(value.restriction(), outStream, context);
+ restrictionCoder.encode(value.restriction(), outStream);
}
@Override
public ElementAndRestriction<ElementT, RestrictionT> decode(InputStream inStream, Context context)
throws IOException {
ElementT key = elementCoder.decode(inStream);
- RestrictionT value = restrictionCoder.decode(inStream, context);
+ RestrictionT value = restrictionCoder.decode(inStream);
return ElementAndRestriction.of(key, value);
}
http://git-wip-us.apache.org/repos/asf/beam/blob/996dce37/runners/core-java/src/main/java/org/apache/beam/runners/core/KeyedWorkItemCoder.java
----------------------------------------------------------------------
diff --git a/runners/core-java/src/main/java/org/apache/beam/runners/core/KeyedWorkItemCoder.java b/runners/core-java/src/main/java/org/apache/beam/runners/core/KeyedWorkItemCoder.java
index ac8a34c..0869244 100644
--- a/runners/core-java/src/main/java/org/apache/beam/runners/core/KeyedWorkItemCoder.java
+++ b/runners/core-java/src/main/java/org/apache/beam/runners/core/KeyedWorkItemCoder.java
@@ -74,7 +74,7 @@ public class KeyedWorkItemCoder<K, ElemT> extends StructuredCoder<KeyedWorkItem<
Coder.Context nestedContext = context.nested();
keyCoder.encode(value.key(), outStream);
timersCoder.encode(value.timersIterable(), outStream);
- elemsCoder.encode(value.elementsIterable(), outStream, context);
+ elemsCoder.encode(value.elementsIterable(), outStream);
}
@Override
@@ -83,7 +83,7 @@ public class KeyedWorkItemCoder<K, ElemT> extends StructuredCoder<KeyedWorkItem<
Coder.Context nestedContext = context.nested();
K key = keyCoder.decode(inStream);
Iterable<TimerData> timers = timersCoder.decode(inStream);
- Iterable<WindowedValue<ElemT>> elems = elemsCoder.decode(inStream, context);
+ Iterable<WindowedValue<ElemT>> elems = elemsCoder.decode(inStream);
return KeyedWorkItems.workItem(key, timers, elems);
}
http://git-wip-us.apache.org/repos/asf/beam/blob/996dce37/runners/core-java/src/main/java/org/apache/beam/runners/core/TimerInternals.java
----------------------------------------------------------------------
diff --git a/runners/core-java/src/main/java/org/apache/beam/runners/core/TimerInternals.java b/runners/core-java/src/main/java/org/apache/beam/runners/core/TimerInternals.java
index 3607fdd..f0a62cd 100644
--- a/runners/core-java/src/main/java/org/apache/beam/runners/core/TimerInternals.java
+++ b/runners/core-java/src/main/java/org/apache/beam/runners/core/TimerInternals.java
@@ -244,7 +244,7 @@ public interface TimerInternals {
STRING_CODER.encode(timer.getTimerId(), outStream);
STRING_CODER.encode(timer.getNamespace().stringKey(), outStream);
INSTANT_CODER.encode(timer.getTimestamp(), outStream);
- STRING_CODER.encode(timer.getDomain().name(), outStream, context);
+ STRING_CODER.encode(timer.getDomain().name(), outStream);
}
@Override
@@ -255,7 +255,7 @@ public interface TimerInternals {
StateNamespace namespace =
StateNamespaces.fromString(STRING_CODER.decode(inStream), windowCoder);
Instant timestamp = INSTANT_CODER.decode(inStream);
- TimeDomain domain = TimeDomain.valueOf(STRING_CODER.decode(inStream, context));
+ TimeDomain domain = TimeDomain.valueOf(STRING_CODER.decode(inStream));
return TimerData.of(timerId, namespace, timestamp, domain);
}
http://git-wip-us.apache.org/repos/asf/beam/blob/996dce37/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/DurationCoder.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/DurationCoder.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/DurationCoder.java
index 8b4ae1d..b7db305 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/DurationCoder.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/DurationCoder.java
@@ -59,13 +59,13 @@ public class DurationCoder extends AtomicCoder<ReadableDuration> {
if (value == null) {
throw new CoderException("cannot encode a null ReadableDuration");
}
- LONG_CODER.encode(toLong(value), outStream, context);
+ LONG_CODER.encode(toLong(value), outStream);
}
@Override
public ReadableDuration decode(InputStream inStream, Context context)
throws CoderException, IOException {
- return fromLong(LONG_CODER.decode(inStream, context));
+ return fromLong(LONG_CODER.decode(inStream));
}
@Override
http://git-wip-us.apache.org/repos/asf/beam/blob/996dce37/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/InstantCoder.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/InstantCoder.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/InstantCoder.java
index 000f406..22b11a3 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/InstantCoder.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/InstantCoder.java
@@ -73,13 +73,13 @@ public class InstantCoder extends AtomicCoder<Instant> {
if (value == null) {
throw new CoderException("cannot encode a null Instant");
}
- LONG_CODER.encode(ORDER_PRESERVING_CONVERTER.convert(value), outStream, context);
+ LONG_CODER.encode(ORDER_PRESERVING_CONVERTER.convert(value), outStream);
}
@Override
public Instant decode(InputStream inStream, Context context)
throws CoderException, IOException {
- return ORDER_PRESERVING_CONVERTER.reverse().convert(LONG_CODER.decode(inStream, context));
+ return ORDER_PRESERVING_CONVERTER.reverse().convert(LONG_CODER.decode(inStream));
}
@Override
http://git-wip-us.apache.org/repos/asf/beam/blob/996dce37/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/ApproximateQuantiles.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/ApproximateQuantiles.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/ApproximateQuantiles.java
index 348cc5f..9b9d3f8 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/ApproximateQuantiles.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/ApproximateQuantiles.java
@@ -681,24 +681,22 @@ public class ApproximateQuantiles {
public void encode(
QuantileState<T, ComparatorT> state, OutputStream outStream, Coder.Context context)
throws CoderException, IOException {
- Coder.Context nestedContext = context.nested();
intCoder.encode(state.numQuantiles, outStream);
intCoder.encode(state.bufferSize, outStream);
elementCoder.encode(state.min, outStream);
elementCoder.encode(state.max, outStream);
elementListCoder.encode(
- state.unbufferedElements, outStream, nestedContext);
+ state.unbufferedElements, outStream);
BigEndianIntegerCoder.of().encode(
- state.buffers.size(), outStream, nestedContext);
+ state.buffers.size(), outStream);
for (QuantileBuffer<T> buffer : state.buffers) {
- encodeBuffer(buffer, outStream, nestedContext);
+ encodeBuffer(buffer, outStream);
}
}
@Override
public QuantileState<T, ComparatorT> decode(InputStream inStream, Coder.Context context)
throws CoderException, IOException {
- Coder.Context nestedContext = context.nested();
int numQuantiles = intCoder.decode(inStream);
int bufferSize = intCoder.decode(inStream);
T min = elementCoder.decode(inStream);
@@ -709,29 +707,27 @@ public class ApproximateQuantiles {
BigEndianIntegerCoder.of().decode(inStream);
List<QuantileBuffer<T>> buffers = new ArrayList<>(numBuffers);
for (int i = 0; i < numBuffers; i++) {
- buffers.add(decodeBuffer(inStream, nestedContext));
+ buffers.add(decodeBuffer(inStream));
}
return new QuantileState<T, ComparatorT>(
compareFn, numQuantiles, min, max, numBuffers, bufferSize, unbufferedElements, buffers);
}
- private void encodeBuffer(
- QuantileBuffer<T> buffer, OutputStream outStream, Coder.Context context)
+ private void encodeBuffer(QuantileBuffer<T> buffer, OutputStream outStream)
throws CoderException, IOException {
DataOutputStream outData = new DataOutputStream(outStream);
outData.writeInt(buffer.level);
outData.writeLong(buffer.weight);
- elementListCoder.encode(buffer.elements, outStream, context);
+ elementListCoder.encode(buffer.elements, outStream);
}
- private QuantileBuffer<T> decodeBuffer(
- InputStream inStream, Coder.Context context)
+ private QuantileBuffer<T> decodeBuffer(InputStream inStream)
throws IOException, CoderException {
DataInputStream inData = new DataInputStream(inStream);
return new QuantileBuffer<>(
inData.readInt(),
inData.readLong(),
- elementListCoder.decode(inStream, context));
+ elementListCoder.decode(inStream));
}
/**
http://git-wip-us.apache.org/repos/asf/beam/blob/996dce37/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Mean.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Mean.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Mean.java
index a46a21f..c8e0d95 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Mean.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Mean.java
@@ -188,7 +188,7 @@ public class Mean {
public void encode(CountSum<NumT> value, OutputStream outStream, Coder.Context context)
throws CoderException, IOException {
LONG_CODER.encode(value.count, outStream);
- DOUBLE_CODER.encode(value.sum, outStream, context);
+ DOUBLE_CODER.encode(value.sum, outStream);
}
@Override
@@ -196,7 +196,7 @@ public class Mean {
throws CoderException, IOException {
return new CountSum<>(
LONG_CODER.decode(inStream),
- DOUBLE_CODER.decode(inStream, context));
+ DOUBLE_CODER.decode(inStream));
}
@Override
http://git-wip-us.apache.org/repos/asf/beam/blob/996dce37/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Top.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Top.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Top.java
index c0381a7..7aec667 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Top.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Top.java
@@ -541,13 +541,13 @@ public class Top {
public void encode(
BoundedHeap<T, ComparatorT> value, OutputStream outStream, Context context)
throws CoderException, IOException {
- listCoder.encode(value.asList(), outStream, context);
+ listCoder.encode(value.asList(), outStream);
}
@Override
public BoundedHeap<T, ComparatorT> decode(InputStream inStream, Coder.Context context)
throws CoderException, IOException {
- return new BoundedHeap<>(maximumSize, compareFn, listCoder.decode(inStream, context));
+ return new BoundedHeap<>(maximumSize, compareFn, listCoder.decode(inStream));
}
@Override
http://git-wip-us.apache.org/repos/asf/beam/blob/996dce37/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/join/CoGbkResult.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/join/CoGbkResult.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/join/CoGbkResult.java
index bd669ef..6603325 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/join/CoGbkResult.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/join/CoGbkResult.java
@@ -251,11 +251,9 @@ public class CoGbkResult {
if (schema.size() == 0) {
return;
}
- int lastIndex = schema.size() - 1;
- for (int unionTag = 0; unionTag < lastIndex; unionTag++) {
+ for (int unionTag = 0; unionTag < schema.size(); unionTag++) {
tagListCoder(unionTag).encode(value.valueMap.get(unionTag), outStream);
}
- tagListCoder(lastIndex).encode(value.valueMap.get(lastIndex), outStream, context);
}
@Override
@@ -266,12 +264,10 @@ public class CoGbkResult {
if (schema.size() == 0) {
return new CoGbkResult(schema, ImmutableList.<Iterable<?>>of());
}
- int lastIndex = schema.size() - 1;
List<Iterable<?>> valueMap = Lists.newArrayListWithExpectedSize(schema.size());
- for (int unionTag = 0; unionTag < lastIndex; unionTag++) {
- valueMap.add(tagListCoder(unionTag).decode(inStream, context.nested()));
+ for (int unionTag = 0; unionTag < schema.size(); unionTag++) {
+ valueMap.add(tagListCoder(unionTag).decode(inStream, Coder.Context.NESTED));
}
- valueMap.add(tagListCoder(lastIndex).decode(inStream, context));
return new CoGbkResult(schema, valueMap);
}
http://git-wip-us.apache.org/repos/asf/beam/blob/996dce37/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/IntervalWindow.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/IntervalWindow.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/IntervalWindow.java
index cb5a7cf..318dc4c 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/IntervalWindow.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/IntervalWindow.java
@@ -182,14 +182,14 @@ public class IntervalWindow extends BoundedWindow
public void encode(IntervalWindow window, OutputStream outStream, Context context)
throws IOException, CoderException {
instantCoder.encode(window.end, outStream);
- durationCoder.encode(new Duration(window.start, window.end), outStream, context);
+ durationCoder.encode(new Duration(window.start, window.end), outStream);
}
@Override
public IntervalWindow decode(InputStream inStream, Context context)
throws IOException, CoderException {
Instant end = instantCoder.decode(inStream);
- ReadableDuration duration = durationCoder.decode(inStream, context);
+ ReadableDuration duration = durationCoder.decode(inStream);
return new IntervalWindow(end.minus(duration), end);
}
http://git-wip-us.apache.org/repos/asf/beam/blob/996dce37/sdks/java/core/src/main/java/org/apache/beam/sdk/values/TimestampedValue.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/values/TimestampedValue.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/values/TimestampedValue.java
index 89747a7..95a3152 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/values/TimestampedValue.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/values/TimestampedValue.java
@@ -106,14 +106,14 @@ public class TimestampedValue<V> {
throws IOException {
valueCoder.encode(windowedElem.getValue(), outStream);
InstantCoder.of().encode(
- windowedElem.getTimestamp(), outStream, context);
+ windowedElem.getTimestamp(), outStream);
}
@Override
public TimestampedValue<T> decode(InputStream inStream, Context context)
throws IOException {
T value = valueCoder.decode(inStream);
- Instant timestamp = InstantCoder.of().decode(inStream, context);
+ Instant timestamp = InstantCoder.of().decode(inStream);
return TimestampedValue.of(value, timestamp);
}
http://git-wip-us.apache.org/repos/asf/beam/blob/996dce37/sdks/java/io/kinesis/src/main/java/org/apache/beam/sdk/io/kinesis/KinesisRecordCoder.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/kinesis/src/main/java/org/apache/beam/sdk/io/kinesis/KinesisRecordCoder.java b/sdks/java/io/kinesis/src/main/java/org/apache/beam/sdk/io/kinesis/KinesisRecordCoder.java
index 77fe127..c6a0174 100644
--- a/sdks/java/io/kinesis/src/main/java/org/apache/beam/sdk/io/kinesis/KinesisRecordCoder.java
+++ b/sdks/java/io/kinesis/src/main/java/org/apache/beam/sdk/io/kinesis/KinesisRecordCoder.java
@@ -45,28 +45,26 @@ class KinesisRecordCoder extends AtomicCoder<KinesisRecord> {
@Override
public void encode(KinesisRecord value, OutputStream outStream, Context context) throws
IOException {
- Context nested = context.nested();
- BYTE_ARRAY_CODER.encode(value.getData().array(), outStream, nested);
- STRING_CODER.encode(value.getSequenceNumber(), outStream, nested);
- STRING_CODER.encode(value.getPartitionKey(), outStream, nested);
- INSTANT_CODER.encode(value.getApproximateArrivalTimestamp(), outStream, nested);
- VAR_LONG_CODER.encode(value.getSubSequenceNumber(), outStream, nested);
- INSTANT_CODER.encode(value.getReadTime(), outStream, nested);
- STRING_CODER.encode(value.getStreamName(), outStream, nested);
- STRING_CODER.encode(value.getShardId(), outStream, context);
+ BYTE_ARRAY_CODER.encode(value.getData().array(), outStream);
+ STRING_CODER.encode(value.getSequenceNumber(), outStream);
+ STRING_CODER.encode(value.getPartitionKey(), outStream);
+ INSTANT_CODER.encode(value.getApproximateArrivalTimestamp(), outStream);
+ VAR_LONG_CODER.encode(value.getSubSequenceNumber(), outStream);
+ INSTANT_CODER.encode(value.getReadTime(), outStream);
+ STRING_CODER.encode(value.getStreamName(), outStream);
+ STRING_CODER.encode(value.getShardId(), outStream);
}
@Override
public KinesisRecord decode(InputStream inStream, Context context) throws IOException {
- Context nested = context.nested();
- ByteBuffer data = ByteBuffer.wrap(BYTE_ARRAY_CODER.decode(inStream, nested));
- String sequenceNumber = STRING_CODER.decode(inStream, nested);
- String partitionKey = STRING_CODER.decode(inStream, nested);
- Instant approximateArrivalTimestamp = INSTANT_CODER.decode(inStream, nested);
- long subSequenceNumber = VAR_LONG_CODER.decode(inStream, nested);
- Instant readTimestamp = INSTANT_CODER.decode(inStream, nested);
- String streamName = STRING_CODER.decode(inStream, nested);
- String shardId = STRING_CODER.decode(inStream, context);
+ ByteBuffer data = ByteBuffer.wrap(BYTE_ARRAY_CODER.decode(inStream));
+ String sequenceNumber = STRING_CODER.decode(inStream);
+ String partitionKey = STRING_CODER.decode(inStream);
+ Instant approximateArrivalTimestamp = INSTANT_CODER.decode(inStream);
+ long subSequenceNumber = VAR_LONG_CODER.decode(inStream);
+ Instant readTimestamp = INSTANT_CODER.decode(inStream);
+ String streamName = STRING_CODER.decode(inStream);
+ String shardId = STRING_CODER.decode(inStream);
return new KinesisRecord(data, sequenceNumber, subSequenceNumber, partitionKey,
approximateArrivalTimestamp, readTimestamp, streamName, shardId
);