You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by jk...@apache.org on 2017/04/10 19:47:58 UTC

[1/2] beam git commit: BEAM-1053 ApexGroupByKeyOperator serialization issues

Repository: beam
Updated Branches:
  refs/heads/master 80d2548f2 -> 836e8e4aa


BEAM-1053 ApexGroupByKeyOperator serialization issues


Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/74e31c35
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/74e31c35
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/74e31c35

Branch: refs/heads/master
Commit: 74e31c350986d093be1a0b53d001b3376def8b69
Parents: 80d2548
Author: Thomas Weise <th...@apache.org>
Authored: Sat Apr 8 13:01:01 2017 -0700
Committer: Eugene Kirpichov <ki...@google.com>
Committed: Mon Apr 10 10:55:31 2017 -0700

----------------------------------------------------------------------
 .../operators/ApexGroupByKeyOperator.java       | 26 ++++++++++----------
 1 file changed, 13 insertions(+), 13 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/beam/blob/74e31c35/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/operators/ApexGroupByKeyOperator.java
----------------------------------------------------------------------
diff --git a/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/operators/ApexGroupByKeyOperator.java b/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/operators/ApexGroupByKeyOperator.java
index 4551c9c..230082e 100644
--- a/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/operators/ApexGroupByKeyOperator.java
+++ b/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/operators/ApexGroupByKeyOperator.java
@@ -25,12 +25,12 @@ import com.datatorrent.api.DefaultOutputPort;
 import com.datatorrent.api.Operator;
 import com.datatorrent.api.StreamCodec;
 import com.datatorrent.api.annotation.OutputPortFieldAnnotation;
+import com.datatorrent.netlet.util.Slice;
 import com.esotericsoftware.kryo.serializers.FieldSerializer.Bind;
 import com.esotericsoftware.kryo.serializers.JavaSerializer;
 import com.google.common.base.Throwables;
 import com.google.common.collect.HashMultimap;
 import com.google.common.collect.Multimap;
-import java.nio.ByteBuffer;
 import java.util.Collection;
 import java.util.Collections;
 import java.util.HashMap;
@@ -96,7 +96,7 @@ public class ApexGroupByKeyOperator<K, V> implements Operator {
   private final SerializablePipelineOptions serializedOptions;
   @Bind(JavaSerializer.class)
   private final StateInternalsFactory<K> stateInternalsFactory;
-  private Map<ByteBuffer, Set<TimerInternals.TimerData>> activeTimers = new HashMap<>();
+  private Map<Slice, Set<TimerInternals.TimerData>> activeTimers = new HashMap<>();
 
   private transient ProcessContext context;
   private transient OldDoFn<KeyedWorkItem<K, V>, KV<K, Iterable<V>>> fn;
@@ -177,18 +177,18 @@ public class ApexGroupByKeyOperator<K, V> implements Operator {
    * We keep these timers in a Set, so that they are deduplicated, as the same
    * timer can be registered multiple times.
    */
-  private Multimap<ByteBuffer, TimerInternals.TimerData> getTimersReadyToProcess(
+  private Multimap<Slice, TimerInternals.TimerData> getTimersReadyToProcess(
       long currentWatermark) {
 
     // we keep the timers to return in a different list and launch them later
     // because we cannot prevent a trigger from registering another trigger,
     // which would lead to concurrent modification exception.
-    Multimap<ByteBuffer, TimerInternals.TimerData> toFire = HashMultimap.create();
+    Multimap<Slice, TimerInternals.TimerData> toFire = HashMultimap.create();
 
-    Iterator<Map.Entry<ByteBuffer, Set<TimerInternals.TimerData>>> it =
+    Iterator<Map.Entry<Slice, Set<TimerInternals.TimerData>>> it =
         activeTimers.entrySet().iterator();
     while (it.hasNext()) {
-      Map.Entry<ByteBuffer, Set<TimerInternals.TimerData>> keyWithTimers = it.next();
+      Map.Entry<Slice, Set<TimerInternals.TimerData>> keyWithTimers = it.next();
 
       Iterator<TimerInternals.TimerData> timerIt = keyWithTimers.getValue().iterator();
       while (timerIt.hasNext()) {
@@ -226,9 +226,9 @@ public class ApexGroupByKeyOperator<K, V> implements Operator {
   }
 
   private void registerActiveTimer(K key, TimerInternals.TimerData timer) {
-    final ByteBuffer keyBytes;
+    final Slice keyBytes;
     try {
-      keyBytes = ByteBuffer.wrap(CoderUtils.encodeToByteArray(keyCoder, key));
+      keyBytes = new Slice(CoderUtils.encodeToByteArray(keyCoder, key));
     } catch (CoderException e) {
       throw new RuntimeException(e);
     }
@@ -241,9 +241,9 @@ public class ApexGroupByKeyOperator<K, V> implements Operator {
   }
 
   private void unregisterActiveTimer(K key, TimerInternals.TimerData timer) {
-    final ByteBuffer keyBytes;
+    final Slice keyBytes;
     try {
-      keyBytes = ByteBuffer.wrap(CoderUtils.encodeToByteArray(keyCoder, key));
+      keyBytes = new Slice(CoderUtils.encodeToByteArray(keyCoder, key));
     } catch (CoderException e) {
       throw new RuntimeException(e);
     }
@@ -260,11 +260,11 @@ public class ApexGroupByKeyOperator<K, V> implements Operator {
 
   private void processWatermark(ApexStreamTuple.WatermarkTuple<?> mark) throws Exception {
     this.inputWatermark = new Instant(mark.getTimestamp());
-    Multimap<ByteBuffer, TimerInternals.TimerData> timers = getTimersReadyToProcess(
+    Multimap<Slice, TimerInternals.TimerData> timers = getTimersReadyToProcess(
         mark.getTimestamp());
     if (!timers.isEmpty()) {
-      for (ByteBuffer keyBytes : timers.keySet()) {
-        K key = CoderUtils.decodeFromByteArray(keyCoder, keyBytes.array());
+      for (Slice keyBytes : timers.keySet()) {
+        K key = CoderUtils.decodeFromByteArray(keyCoder, keyBytes.buffer);
         KeyedWorkItem<K, V> kwi = KeyedWorkItems.<K, V>timersWorkItem(key, timers.get(keyBytes));
         context.setElement(kwi, getStateInternalsForKey(kwi.key()));
         fn.processElement(context);


[2/2] beam git commit: This closes #2473

Posted by jk...@apache.org.
This closes #2473


Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/836e8e4a
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/836e8e4a
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/836e8e4a

Branch: refs/heads/master
Commit: 836e8e4aab239dd41f81de2f1553850b3fd3d716
Parents: 80d2548 74e31c3
Author: Eugene Kirpichov <ki...@google.com>
Authored: Mon Apr 10 10:55:50 2017 -0700
Committer: Eugene Kirpichov <ki...@google.com>
Committed: Mon Apr 10 10:55:50 2017 -0700

----------------------------------------------------------------------
 .../operators/ApexGroupByKeyOperator.java       | 26 ++++++++++----------
 1 file changed, 13 insertions(+), 13 deletions(-)
----------------------------------------------------------------------