You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by jk...@apache.org on 2017/04/10 19:47:58 UTC
[1/2] beam git commit: BEAM-1053 ApexGroupByKeyOperator serialization
issues
Repository: beam
Updated Branches:
refs/heads/master 80d2548f2 -> 836e8e4aa
BEAM-1053 ApexGroupByKeyOperator serialization issues
Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/74e31c35
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/74e31c35
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/74e31c35
Branch: refs/heads/master
Commit: 74e31c350986d093be1a0b53d001b3376def8b69
Parents: 80d2548
Author: Thomas Weise <th...@apache.org>
Authored: Sat Apr 8 13:01:01 2017 -0700
Committer: Eugene Kirpichov <ki...@google.com>
Committed: Mon Apr 10 10:55:31 2017 -0700
----------------------------------------------------------------------
.../operators/ApexGroupByKeyOperator.java | 26 ++++++++++----------
1 file changed, 13 insertions(+), 13 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/beam/blob/74e31c35/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/operators/ApexGroupByKeyOperator.java
----------------------------------------------------------------------
diff --git a/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/operators/ApexGroupByKeyOperator.java b/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/operators/ApexGroupByKeyOperator.java
index 4551c9c..230082e 100644
--- a/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/operators/ApexGroupByKeyOperator.java
+++ b/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/operators/ApexGroupByKeyOperator.java
@@ -25,12 +25,12 @@ import com.datatorrent.api.DefaultOutputPort;
import com.datatorrent.api.Operator;
import com.datatorrent.api.StreamCodec;
import com.datatorrent.api.annotation.OutputPortFieldAnnotation;
+import com.datatorrent.netlet.util.Slice;
import com.esotericsoftware.kryo.serializers.FieldSerializer.Bind;
import com.esotericsoftware.kryo.serializers.JavaSerializer;
import com.google.common.base.Throwables;
import com.google.common.collect.HashMultimap;
import com.google.common.collect.Multimap;
-import java.nio.ByteBuffer;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
@@ -96,7 +96,7 @@ public class ApexGroupByKeyOperator<K, V> implements Operator {
private final SerializablePipelineOptions serializedOptions;
@Bind(JavaSerializer.class)
private final StateInternalsFactory<K> stateInternalsFactory;
- private Map<ByteBuffer, Set<TimerInternals.TimerData>> activeTimers = new HashMap<>();
+ private Map<Slice, Set<TimerInternals.TimerData>> activeTimers = new HashMap<>();
private transient ProcessContext context;
private transient OldDoFn<KeyedWorkItem<K, V>, KV<K, Iterable<V>>> fn;
@@ -177,18 +177,18 @@ public class ApexGroupByKeyOperator<K, V> implements Operator {
* We keep these timers in a Set, so that they are deduplicated, as the same
* timer can be registered multiple times.
*/
- private Multimap<ByteBuffer, TimerInternals.TimerData> getTimersReadyToProcess(
+ private Multimap<Slice, TimerInternals.TimerData> getTimersReadyToProcess(
long currentWatermark) {
// we keep the timers to return in a different list and launch them later
// because we cannot prevent a trigger from registering another trigger,
// which would lead to concurrent modification exception.
- Multimap<ByteBuffer, TimerInternals.TimerData> toFire = HashMultimap.create();
+ Multimap<Slice, TimerInternals.TimerData> toFire = HashMultimap.create();
- Iterator<Map.Entry<ByteBuffer, Set<TimerInternals.TimerData>>> it =
+ Iterator<Map.Entry<Slice, Set<TimerInternals.TimerData>>> it =
activeTimers.entrySet().iterator();
while (it.hasNext()) {
- Map.Entry<ByteBuffer, Set<TimerInternals.TimerData>> keyWithTimers = it.next();
+ Map.Entry<Slice, Set<TimerInternals.TimerData>> keyWithTimers = it.next();
Iterator<TimerInternals.TimerData> timerIt = keyWithTimers.getValue().iterator();
while (timerIt.hasNext()) {
@@ -226,9 +226,9 @@ public class ApexGroupByKeyOperator<K, V> implements Operator {
}
private void registerActiveTimer(K key, TimerInternals.TimerData timer) {
- final ByteBuffer keyBytes;
+ final Slice keyBytes;
try {
- keyBytes = ByteBuffer.wrap(CoderUtils.encodeToByteArray(keyCoder, key));
+ keyBytes = new Slice(CoderUtils.encodeToByteArray(keyCoder, key));
} catch (CoderException e) {
throw new RuntimeException(e);
}
@@ -241,9 +241,9 @@ public class ApexGroupByKeyOperator<K, V> implements Operator {
}
private void unregisterActiveTimer(K key, TimerInternals.TimerData timer) {
- final ByteBuffer keyBytes;
+ final Slice keyBytes;
try {
- keyBytes = ByteBuffer.wrap(CoderUtils.encodeToByteArray(keyCoder, key));
+ keyBytes = new Slice(CoderUtils.encodeToByteArray(keyCoder, key));
} catch (CoderException e) {
throw new RuntimeException(e);
}
@@ -260,11 +260,11 @@ public class ApexGroupByKeyOperator<K, V> implements Operator {
private void processWatermark(ApexStreamTuple.WatermarkTuple<?> mark) throws Exception {
this.inputWatermark = new Instant(mark.getTimestamp());
- Multimap<ByteBuffer, TimerInternals.TimerData> timers = getTimersReadyToProcess(
+ Multimap<Slice, TimerInternals.TimerData> timers = getTimersReadyToProcess(
mark.getTimestamp());
if (!timers.isEmpty()) {
- for (ByteBuffer keyBytes : timers.keySet()) {
- K key = CoderUtils.decodeFromByteArray(keyCoder, keyBytes.array());
+ for (Slice keyBytes : timers.keySet()) {
+ K key = CoderUtils.decodeFromByteArray(keyCoder, keyBytes.buffer);
KeyedWorkItem<K, V> kwi = KeyedWorkItems.<K, V>timersWorkItem(key, timers.get(keyBytes));
context.setElement(kwi, getStateInternalsForKey(kwi.key()));
fn.processElement(context);
[2/2] beam git commit: This closes #2473
Posted by jk...@apache.org.
This closes #2473
Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/836e8e4a
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/836e8e4a
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/836e8e4a
Branch: refs/heads/master
Commit: 836e8e4aab239dd41f81de2f1553850b3fd3d716
Parents: 80d2548 74e31c3
Author: Eugene Kirpichov <ki...@google.com>
Authored: Mon Apr 10 10:55:50 2017 -0700
Committer: Eugene Kirpichov <ki...@google.com>
Committed: Mon Apr 10 10:55:50 2017 -0700
----------------------------------------------------------------------
.../operators/ApexGroupByKeyOperator.java | 26 ++++++++++----------
1 file changed, 13 insertions(+), 13 deletions(-)
----------------------------------------------------------------------