You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by lc...@apache.org on 2017/05/09 04:21:26 UTC

[06/13] beam git commit: Remove contexts from coders where they'll never be used.

Remove contexts from coders where they'll never be used.


Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/996dce37
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/996dce37
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/996dce37

Branch: refs/heads/master
Commit: 996dce37b76b103f104328b7caa65f73a1bcb15a
Parents: 27e9a06
Author: Robert Bradshaw <ro...@gmail.com>
Authored: Fri May 5 16:36:47 2017 -0700
Committer: Luke Cwik <lc...@google.com>
Committed: Mon May 8 20:17:56 2017 -0700

----------------------------------------------------------------------
 .../UnboundedReadFromBoundedSource.java         |  4 +--
 .../core/ElementAndRestrictionCoder.java        |  4 +--
 .../beam/runners/core/KeyedWorkItemCoder.java   |  4 +--
 .../beam/runners/core/TimerInternals.java       |  4 +--
 .../apache/beam/sdk/coders/DurationCoder.java   |  4 +--
 .../apache/beam/sdk/coders/InstantCoder.java    |  4 +--
 .../sdk/transforms/ApproximateQuantiles.java    | 20 +++++-------
 .../org/apache/beam/sdk/transforms/Mean.java    |  4 +--
 .../org/apache/beam/sdk/transforms/Top.java     |  4 +--
 .../beam/sdk/transforms/join/CoGbkResult.java   | 10 ++----
 .../transforms/windowing/IntervalWindow.java    |  4 +--
 .../beam/sdk/values/TimestampedValue.java       |  4 +--
 .../beam/sdk/io/kinesis/KinesisRecordCoder.java | 34 +++++++++-----------
 13 files changed, 47 insertions(+), 57 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/beam/blob/996dce37/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/UnboundedReadFromBoundedSource.java
----------------------------------------------------------------------
diff --git a/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/UnboundedReadFromBoundedSource.java b/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/UnboundedReadFromBoundedSource.java
index ae28e3a..b74da80 100644
--- a/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/UnboundedReadFromBoundedSource.java
+++ b/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/UnboundedReadFromBoundedSource.java
@@ -224,7 +224,7 @@ public class UnboundedReadFromBoundedSource<T> extends PTransform<PBegin, PColle
       public void encode(Checkpoint<T> value, OutputStream outStream, Context context)
           throws CoderException, IOException {
         elemsCoder.encode(value.residualElements, outStream);
-        sourceCoder.encode(value.residualSource, outStream, context);
+        sourceCoder.encode(value.residualSource, outStream);
       }
 
       @SuppressWarnings("unchecked")
@@ -233,7 +233,7 @@ public class UnboundedReadFromBoundedSource<T> extends PTransform<PBegin, PColle
           throws CoderException, IOException {
         return new Checkpoint<>(
             elemsCoder.decode(inStream),
-            sourceCoder.decode(inStream, context));
+            sourceCoder.decode(inStream));
       }
 
       @Override

http://git-wip-us.apache.org/repos/asf/beam/blob/996dce37/runners/core-java/src/main/java/org/apache/beam/runners/core/ElementAndRestrictionCoder.java
----------------------------------------------------------------------
diff --git a/runners/core-java/src/main/java/org/apache/beam/runners/core/ElementAndRestrictionCoder.java b/runners/core-java/src/main/java/org/apache/beam/runners/core/ElementAndRestrictionCoder.java
index 5ddd865..fcb1deb 100644
--- a/runners/core-java/src/main/java/org/apache/beam/runners/core/ElementAndRestrictionCoder.java
+++ b/runners/core-java/src/main/java/org/apache/beam/runners/core/ElementAndRestrictionCoder.java
@@ -56,14 +56,14 @@ public class ElementAndRestrictionCoder<ElementT, RestrictionT>
       throw new CoderException("cannot encode a null ElementAndRestriction");
     }
     elementCoder.encode(value.element(), outStream);
-    restrictionCoder.encode(value.restriction(), outStream, context);
+    restrictionCoder.encode(value.restriction(), outStream);
   }
 
   @Override
   public ElementAndRestriction<ElementT, RestrictionT> decode(InputStream inStream, Context context)
       throws IOException {
     ElementT key = elementCoder.decode(inStream);
-    RestrictionT value = restrictionCoder.decode(inStream, context);
+    RestrictionT value = restrictionCoder.decode(inStream);
     return ElementAndRestriction.of(key, value);
   }
 

http://git-wip-us.apache.org/repos/asf/beam/blob/996dce37/runners/core-java/src/main/java/org/apache/beam/runners/core/KeyedWorkItemCoder.java
----------------------------------------------------------------------
diff --git a/runners/core-java/src/main/java/org/apache/beam/runners/core/KeyedWorkItemCoder.java b/runners/core-java/src/main/java/org/apache/beam/runners/core/KeyedWorkItemCoder.java
index ac8a34c..0869244 100644
--- a/runners/core-java/src/main/java/org/apache/beam/runners/core/KeyedWorkItemCoder.java
+++ b/runners/core-java/src/main/java/org/apache/beam/runners/core/KeyedWorkItemCoder.java
@@ -74,7 +74,7 @@ public class KeyedWorkItemCoder<K, ElemT> extends StructuredCoder<KeyedWorkItem<
     Coder.Context nestedContext = context.nested();
     keyCoder.encode(value.key(), outStream);
     timersCoder.encode(value.timersIterable(), outStream);
-    elemsCoder.encode(value.elementsIterable(), outStream, context);
+    elemsCoder.encode(value.elementsIterable(), outStream);
   }
 
   @Override
@@ -83,7 +83,7 @@ public class KeyedWorkItemCoder<K, ElemT> extends StructuredCoder<KeyedWorkItem<
     Coder.Context nestedContext = context.nested();
     K key = keyCoder.decode(inStream);
     Iterable<TimerData> timers = timersCoder.decode(inStream);
-    Iterable<WindowedValue<ElemT>> elems = elemsCoder.decode(inStream, context);
+    Iterable<WindowedValue<ElemT>> elems = elemsCoder.decode(inStream);
     return KeyedWorkItems.workItem(key, timers, elems);
   }
 

http://git-wip-us.apache.org/repos/asf/beam/blob/996dce37/runners/core-java/src/main/java/org/apache/beam/runners/core/TimerInternals.java
----------------------------------------------------------------------
diff --git a/runners/core-java/src/main/java/org/apache/beam/runners/core/TimerInternals.java b/runners/core-java/src/main/java/org/apache/beam/runners/core/TimerInternals.java
index 3607fdd..f0a62cd 100644
--- a/runners/core-java/src/main/java/org/apache/beam/runners/core/TimerInternals.java
+++ b/runners/core-java/src/main/java/org/apache/beam/runners/core/TimerInternals.java
@@ -244,7 +244,7 @@ public interface TimerInternals {
       STRING_CODER.encode(timer.getTimerId(), outStream);
       STRING_CODER.encode(timer.getNamespace().stringKey(), outStream);
       INSTANT_CODER.encode(timer.getTimestamp(), outStream);
-      STRING_CODER.encode(timer.getDomain().name(), outStream, context);
+      STRING_CODER.encode(timer.getDomain().name(), outStream);
     }
 
     @Override
@@ -255,7 +255,7 @@ public interface TimerInternals {
       StateNamespace namespace =
           StateNamespaces.fromString(STRING_CODER.decode(inStream), windowCoder);
       Instant timestamp = INSTANT_CODER.decode(inStream);
-      TimeDomain domain = TimeDomain.valueOf(STRING_CODER.decode(inStream, context));
+      TimeDomain domain = TimeDomain.valueOf(STRING_CODER.decode(inStream));
       return TimerData.of(timerId, namespace, timestamp, domain);
     }
 

http://git-wip-us.apache.org/repos/asf/beam/blob/996dce37/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/DurationCoder.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/DurationCoder.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/DurationCoder.java
index 8b4ae1d..b7db305 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/DurationCoder.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/DurationCoder.java
@@ -59,13 +59,13 @@ public class DurationCoder extends AtomicCoder<ReadableDuration> {
     if (value == null) {
       throw new CoderException("cannot encode a null ReadableDuration");
     }
-    LONG_CODER.encode(toLong(value), outStream, context);
+    LONG_CODER.encode(toLong(value), outStream);
   }
 
   @Override
   public ReadableDuration decode(InputStream inStream, Context context)
       throws CoderException, IOException {
-      return fromLong(LONG_CODER.decode(inStream, context));
+      return fromLong(LONG_CODER.decode(inStream));
   }
 
   @Override

http://git-wip-us.apache.org/repos/asf/beam/blob/996dce37/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/InstantCoder.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/InstantCoder.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/InstantCoder.java
index 000f406..22b11a3 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/InstantCoder.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/InstantCoder.java
@@ -73,13 +73,13 @@ public class InstantCoder extends AtomicCoder<Instant> {
     if (value == null) {
       throw new CoderException("cannot encode a null Instant");
     }
-    LONG_CODER.encode(ORDER_PRESERVING_CONVERTER.convert(value), outStream, context);
+    LONG_CODER.encode(ORDER_PRESERVING_CONVERTER.convert(value), outStream);
   }
 
   @Override
   public Instant decode(InputStream inStream, Context context)
       throws CoderException, IOException {
-    return ORDER_PRESERVING_CONVERTER.reverse().convert(LONG_CODER.decode(inStream, context));
+    return ORDER_PRESERVING_CONVERTER.reverse().convert(LONG_CODER.decode(inStream));
   }
 
   @Override

http://git-wip-us.apache.org/repos/asf/beam/blob/996dce37/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/ApproximateQuantiles.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/ApproximateQuantiles.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/ApproximateQuantiles.java
index 348cc5f..9b9d3f8 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/ApproximateQuantiles.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/ApproximateQuantiles.java
@@ -681,24 +681,22 @@ public class ApproximateQuantiles {
     public void encode(
         QuantileState<T, ComparatorT> state, OutputStream outStream, Coder.Context context)
         throws CoderException, IOException {
-      Coder.Context nestedContext = context.nested();
       intCoder.encode(state.numQuantiles, outStream);
       intCoder.encode(state.bufferSize, outStream);
       elementCoder.encode(state.min, outStream);
       elementCoder.encode(state.max, outStream);
       elementListCoder.encode(
-          state.unbufferedElements, outStream, nestedContext);
+          state.unbufferedElements, outStream);
       BigEndianIntegerCoder.of().encode(
-          state.buffers.size(), outStream, nestedContext);
+          state.buffers.size(), outStream);
       for (QuantileBuffer<T> buffer : state.buffers) {
-        encodeBuffer(buffer, outStream, nestedContext);
+        encodeBuffer(buffer, outStream);
       }
     }
 
     @Override
     public QuantileState<T, ComparatorT> decode(InputStream inStream, Coder.Context context)
         throws CoderException, IOException {
-      Coder.Context nestedContext = context.nested();
       int numQuantiles = intCoder.decode(inStream);
       int bufferSize = intCoder.decode(inStream);
       T min = elementCoder.decode(inStream);
@@ -709,29 +707,27 @@ public class ApproximateQuantiles {
           BigEndianIntegerCoder.of().decode(inStream);
       List<QuantileBuffer<T>> buffers = new ArrayList<>(numBuffers);
       for (int i = 0; i < numBuffers; i++) {
-        buffers.add(decodeBuffer(inStream, nestedContext));
+        buffers.add(decodeBuffer(inStream));
       }
       return new QuantileState<T, ComparatorT>(
           compareFn, numQuantiles, min, max, numBuffers, bufferSize, unbufferedElements, buffers);
     }
 
-    private void encodeBuffer(
-        QuantileBuffer<T> buffer, OutputStream outStream, Coder.Context context)
+    private void encodeBuffer(QuantileBuffer<T> buffer, OutputStream outStream)
         throws CoderException, IOException {
       DataOutputStream outData = new DataOutputStream(outStream);
       outData.writeInt(buffer.level);
       outData.writeLong(buffer.weight);
-      elementListCoder.encode(buffer.elements, outStream, context);
+      elementListCoder.encode(buffer.elements, outStream);
     }
 
-    private QuantileBuffer<T> decodeBuffer(
-        InputStream inStream, Coder.Context context)
+    private QuantileBuffer<T> decodeBuffer(InputStream inStream)
         throws IOException, CoderException {
       DataInputStream inData = new DataInputStream(inStream);
       return new QuantileBuffer<>(
           inData.readInt(),
           inData.readLong(),
-          elementListCoder.decode(inStream, context));
+          elementListCoder.decode(inStream));
     }
 
     /**

http://git-wip-us.apache.org/repos/asf/beam/blob/996dce37/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Mean.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Mean.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Mean.java
index a46a21f..c8e0d95 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Mean.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Mean.java
@@ -188,7 +188,7 @@ public class Mean {
      public void encode(CountSum<NumT> value, OutputStream outStream, Coder.Context context)
          throws CoderException, IOException {
        LONG_CODER.encode(value.count, outStream);
-       DOUBLE_CODER.encode(value.sum, outStream, context);
+       DOUBLE_CODER.encode(value.sum, outStream);
      }
 
      @Override
@@ -196,7 +196,7 @@ public class Mean {
          throws CoderException, IOException {
        return new CountSum<>(
            LONG_CODER.decode(inStream),
-           DOUBLE_CODER.decode(inStream, context));
+           DOUBLE_CODER.decode(inStream));
     }
 
     @Override

http://git-wip-us.apache.org/repos/asf/beam/blob/996dce37/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Top.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Top.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Top.java
index c0381a7..7aec667 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Top.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Top.java
@@ -541,13 +541,13 @@ public class Top {
     public void encode(
         BoundedHeap<T, ComparatorT> value, OutputStream outStream, Context context)
         throws CoderException, IOException {
-      listCoder.encode(value.asList(), outStream, context);
+      listCoder.encode(value.asList(), outStream);
     }
 
     @Override
     public BoundedHeap<T, ComparatorT> decode(InputStream inStream, Coder.Context context)
         throws CoderException, IOException {
-      return new BoundedHeap<>(maximumSize, compareFn, listCoder.decode(inStream, context));
+      return new BoundedHeap<>(maximumSize, compareFn, listCoder.decode(inStream));
     }
 
     @Override

http://git-wip-us.apache.org/repos/asf/beam/blob/996dce37/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/join/CoGbkResult.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/join/CoGbkResult.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/join/CoGbkResult.java
index bd669ef..6603325 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/join/CoGbkResult.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/join/CoGbkResult.java
@@ -251,11 +251,9 @@ public class CoGbkResult {
       if (schema.size() == 0) {
         return;
       }
-      int lastIndex = schema.size() - 1;
-      for (int unionTag = 0; unionTag < lastIndex; unionTag++) {
+      for (int unionTag = 0; unionTag < schema.size(); unionTag++) {
         tagListCoder(unionTag).encode(value.valueMap.get(unionTag), outStream);
       }
-      tagListCoder(lastIndex).encode(value.valueMap.get(lastIndex), outStream, context);
     }
 
     @Override
@@ -266,12 +264,10 @@ public class CoGbkResult {
       if (schema.size() == 0) {
         return new CoGbkResult(schema, ImmutableList.<Iterable<?>>of());
       }
-      int lastIndex = schema.size() - 1;
       List<Iterable<?>> valueMap = Lists.newArrayListWithExpectedSize(schema.size());
-      for (int unionTag = 0; unionTag < lastIndex; unionTag++) {
-        valueMap.add(tagListCoder(unionTag).decode(inStream, context.nested()));
+      for (int unionTag = 0; unionTag < schema.size(); unionTag++) {
+        valueMap.add(tagListCoder(unionTag).decode(inStream, Coder.Context.NESTED));
       }
-      valueMap.add(tagListCoder(lastIndex).decode(inStream, context));
       return new CoGbkResult(schema, valueMap);
     }
 

http://git-wip-us.apache.org/repos/asf/beam/blob/996dce37/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/IntervalWindow.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/IntervalWindow.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/IntervalWindow.java
index cb5a7cf..318dc4c 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/IntervalWindow.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/IntervalWindow.java
@@ -182,14 +182,14 @@ public class IntervalWindow extends BoundedWindow
     public void encode(IntervalWindow window, OutputStream outStream, Context context)
         throws IOException, CoderException {
       instantCoder.encode(window.end, outStream);
-      durationCoder.encode(new Duration(window.start, window.end), outStream, context);
+      durationCoder.encode(new Duration(window.start, window.end), outStream);
     }
 
     @Override
     public IntervalWindow decode(InputStream inStream, Context context)
         throws IOException, CoderException {
       Instant end = instantCoder.decode(inStream);
-      ReadableDuration duration = durationCoder.decode(inStream, context);
+      ReadableDuration duration = durationCoder.decode(inStream);
       return new IntervalWindow(end.minus(duration), end);
     }
 

http://git-wip-us.apache.org/repos/asf/beam/blob/996dce37/sdks/java/core/src/main/java/org/apache/beam/sdk/values/TimestampedValue.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/values/TimestampedValue.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/values/TimestampedValue.java
index 89747a7..95a3152 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/values/TimestampedValue.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/values/TimestampedValue.java
@@ -106,14 +106,14 @@ public class TimestampedValue<V> {
         throws IOException {
       valueCoder.encode(windowedElem.getValue(), outStream);
       InstantCoder.of().encode(
-          windowedElem.getTimestamp(), outStream, context);
+          windowedElem.getTimestamp(), outStream);
     }
 
     @Override
     public TimestampedValue<T> decode(InputStream inStream, Context context)
         throws IOException {
       T value = valueCoder.decode(inStream);
-      Instant timestamp = InstantCoder.of().decode(inStream, context);
+      Instant timestamp = InstantCoder.of().decode(inStream);
       return TimestampedValue.of(value, timestamp);
     }
 

http://git-wip-us.apache.org/repos/asf/beam/blob/996dce37/sdks/java/io/kinesis/src/main/java/org/apache/beam/sdk/io/kinesis/KinesisRecordCoder.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/kinesis/src/main/java/org/apache/beam/sdk/io/kinesis/KinesisRecordCoder.java b/sdks/java/io/kinesis/src/main/java/org/apache/beam/sdk/io/kinesis/KinesisRecordCoder.java
index 77fe127..c6a0174 100644
--- a/sdks/java/io/kinesis/src/main/java/org/apache/beam/sdk/io/kinesis/KinesisRecordCoder.java
+++ b/sdks/java/io/kinesis/src/main/java/org/apache/beam/sdk/io/kinesis/KinesisRecordCoder.java
@@ -45,28 +45,26 @@ class KinesisRecordCoder extends AtomicCoder<KinesisRecord> {
     @Override
     public void encode(KinesisRecord value, OutputStream outStream, Context context) throws
             IOException {
-        Context nested = context.nested();
-        BYTE_ARRAY_CODER.encode(value.getData().array(), outStream, nested);
-        STRING_CODER.encode(value.getSequenceNumber(), outStream, nested);
-        STRING_CODER.encode(value.getPartitionKey(), outStream, nested);
-        INSTANT_CODER.encode(value.getApproximateArrivalTimestamp(), outStream, nested);
-        VAR_LONG_CODER.encode(value.getSubSequenceNumber(), outStream, nested);
-        INSTANT_CODER.encode(value.getReadTime(), outStream, nested);
-        STRING_CODER.encode(value.getStreamName(), outStream, nested);
-        STRING_CODER.encode(value.getShardId(), outStream, context);
+        BYTE_ARRAY_CODER.encode(value.getData().array(), outStream);
+        STRING_CODER.encode(value.getSequenceNumber(), outStream);
+        STRING_CODER.encode(value.getPartitionKey(), outStream);
+        INSTANT_CODER.encode(value.getApproximateArrivalTimestamp(), outStream);
+        VAR_LONG_CODER.encode(value.getSubSequenceNumber(), outStream);
+        INSTANT_CODER.encode(value.getReadTime(), outStream);
+        STRING_CODER.encode(value.getStreamName(), outStream);
+        STRING_CODER.encode(value.getShardId(), outStream);
     }
 
     @Override
     public KinesisRecord decode(InputStream inStream, Context context) throws IOException {
-        Context nested = context.nested();
-        ByteBuffer data = ByteBuffer.wrap(BYTE_ARRAY_CODER.decode(inStream, nested));
-        String sequenceNumber = STRING_CODER.decode(inStream, nested);
-        String partitionKey = STRING_CODER.decode(inStream, nested);
-        Instant approximateArrivalTimestamp = INSTANT_CODER.decode(inStream, nested);
-        long subSequenceNumber = VAR_LONG_CODER.decode(inStream, nested);
-        Instant readTimestamp = INSTANT_CODER.decode(inStream, nested);
-        String streamName = STRING_CODER.decode(inStream, nested);
-        String shardId = STRING_CODER.decode(inStream, context);
+        ByteBuffer data = ByteBuffer.wrap(BYTE_ARRAY_CODER.decode(inStream));
+        String sequenceNumber = STRING_CODER.decode(inStream);
+        String partitionKey = STRING_CODER.decode(inStream);
+        Instant approximateArrivalTimestamp = INSTANT_CODER.decode(inStream);
+        long subSequenceNumber = VAR_LONG_CODER.decode(inStream);
+        Instant readTimestamp = INSTANT_CODER.decode(inStream);
+        String streamName = STRING_CODER.decode(inStream);
+        String shardId = STRING_CODER.decode(inStream);
         return new KinesisRecord(data, sequenceNumber, subSequenceNumber, partitionKey,
                 approximateArrivalTimestamp, readTimestamp, streamName, shardId
         );