You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by al...@apache.org on 2015/10/23 15:46:55 UTC
[3/3] flink git commit: [hotfix] Fix Mutable Object window
aggregator/Disable Object Copy
[hotfix] Fix Mutable Object window aggregator/Disable Object Copy
This fixes the aggregators to make copies of the objects so that it
works with window operators that are not mutable-object safe.
This also disables object copy in WindowOperator and
NonKeyedWindowOperator.
Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/712c868e
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/712c868e
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/712c868e
Branch: refs/heads/release-0.10
Commit: 712c868eb77643ae07542d5a073365e0862a5e97
Parents: 45ab0eb
Author: Aljoscha Krettek <al...@gmail.com>
Authored: Fri Oct 23 13:08:35 2015 +0200
Committer: Aljoscha Krettek <al...@gmail.com>
Committed: Fri Oct 23 15:46:28 2015 +0200
----------------------------------------------------------------------
.../aggregation/AggregationFunction.java | 8 +--
.../aggregation/ComparableAggregator.java | 51 ++++++++++----------
.../functions/aggregation/SumAggregator.java | 35 +++++++++++---
.../functions/windowing/FoldWindowFunction.java | 12 ++---
.../windowing/NonKeyedWindowOperator.java | 5 +-
.../operators/windowing/WindowOperator.java | 5 +-
6 files changed, 66 insertions(+), 50 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/712c868e/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/aggregation/AggregationFunction.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/aggregation/AggregationFunction.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/aggregation/AggregationFunction.java
index 23cca90..ed39103 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/aggregation/AggregationFunction.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/aggregation/AggregationFunction.java
@@ -22,13 +22,7 @@ import org.apache.flink.api.common.functions.RichReduceFunction;
public abstract class AggregationFunction<T> extends RichReduceFunction<T> {
private static final long serialVersionUID = 1L;
- public int position;
-
- public AggregationFunction(int pos) {
- this.position = pos;
- }
-
- public static enum AggregationType {
+ public enum AggregationType {
SUM, MIN, MAX, MINBY, MAXBY,
}
http://git-wip-us.apache.org/repos/asf/flink/blob/712c868e/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/aggregation/ComparableAggregator.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/aggregation/ComparableAggregator.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/aggregation/ComparableAggregator.java
index e5501a0..e70e30a 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/aggregation/ComparableAggregator.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/aggregation/ComparableAggregator.java
@@ -25,35 +25,39 @@ public class ComparableAggregator<T> extends AggregationFunction<T> {
private static final long serialVersionUID = 1L;
- public Comparator comparator;
- public boolean byAggregate;
- public boolean first;
- FieldAccessor<T, Object> fieldAccessor;
+ private Comparator comparator;
+ private boolean byAggregate;
+ private boolean first;
+ private final FieldAccessor<T, Object> fieldAccessor;
- private ComparableAggregator(int pos, AggregationType aggregationType, boolean first) {
- super(pos);
+ private ComparableAggregator(AggregationType aggregationType, FieldAccessor<T, Object> fieldAccessor, boolean first) {
this.comparator = Comparator.getForAggregation(aggregationType);
this.byAggregate = (aggregationType == AggregationType.MAXBY) || (aggregationType == AggregationType.MINBY);
this.first = first;
+ this.fieldAccessor = fieldAccessor;
}
- public ComparableAggregator(int positionToAggregate, TypeInformation<T> typeInfo, AggregationType aggregationType
- , ExecutionConfig config) {
+ public ComparableAggregator(int positionToAggregate,
+ TypeInformation<T> typeInfo,
+ AggregationType aggregationType,
+ ExecutionConfig config) {
this(positionToAggregate, typeInfo, aggregationType, false, config);
}
- public ComparableAggregator(int positionToAggregate, TypeInformation<T> typeInfo, AggregationType aggregationType,
- boolean first, ExecutionConfig config) {
- this(positionToAggregate, aggregationType, first);
- this.fieldAccessor = FieldAccessor.create(positionToAggregate, typeInfo, config);
- this.first = first;
+ public ComparableAggregator(int positionToAggregate,
+ TypeInformation<T> typeInfo,
+ AggregationType aggregationType,
+ boolean first,
+ ExecutionConfig config) {
+ this(aggregationType, FieldAccessor.create(positionToAggregate, typeInfo, config), first);
}
public ComparableAggregator(String field,
- TypeInformation<T> typeInfo, AggregationType aggregationType, boolean first, ExecutionConfig config) {
- this(0, aggregationType, first);
- this.fieldAccessor = FieldAccessor.create(field, typeInfo, config);
- this.first = first;
+ TypeInformation<T> typeInfo,
+ AggregationType aggregationType,
+ boolean first,
+ ExecutionConfig config) {
+ this(aggregationType, FieldAccessor.create(field, typeInfo, config), first);
}
@@ -66,16 +70,13 @@ public class ComparableAggregator<T> extends AggregationFunction<T> {
int c = comparator.isExtremal(o1, o2);
if (byAggregate) {
- if (c == 1) {
- return value1;
- }
- if (first) {
- if (c == 0) {
- return value1;
- }
+ // if they are the same we choose based on whether we want to first or last
+ // element with the min/max.
+ if (c == 0) {
+ return first ? value1 : value2;
}
- return value2;
+ return c == 1 ? value1 : value2;
} else {
if (c == 0) {
http://git-wip-us.apache.org/repos/asf/flink/blob/712c868e/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/aggregation/SumAggregator.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/aggregation/SumAggregator.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/aggregation/SumAggregator.java
index b045233..8c9cf7a 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/aggregation/SumAggregator.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/aggregation/SumAggregator.java
@@ -19,30 +19,53 @@ package org.apache.flink.streaming.api.functions.aggregation;
import org.apache.flink.api.common.ExecutionConfig;
import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.api.java.tuple.Tuple;
+import org.apache.flink.api.java.typeutils.TupleTypeInfo;
import org.apache.flink.streaming.util.FieldAccessor;
public class SumAggregator<T> extends AggregationFunction<T> {
private static final long serialVersionUID = 1L;
- FieldAccessor<T, Object> fieldAccessor;
- SumFunction adder;
+ private final FieldAccessor<T, Object> fieldAccessor;
+ private final SumFunction adder;
+ private final TypeSerializer<T> serializer;
+ private final boolean isTuple;
public SumAggregator(int pos, TypeInformation<T> typeInfo, ExecutionConfig config) {
- super(pos);
fieldAccessor = FieldAccessor.create(pos, typeInfo, config);
adder = SumFunction.getForClass(fieldAccessor.getFieldType().getTypeClass());
+ if (typeInfo instanceof TupleTypeInfo) {
+ isTuple = true;
+ serializer = null;
+ } else {
+ isTuple = false;
+ this.serializer = typeInfo.createSerializer(config);
+ }
}
public SumAggregator(String field, TypeInformation<T> typeInfo, ExecutionConfig config) {
- super(0);
fieldAccessor = FieldAccessor.create(field, typeInfo, config);
adder = SumFunction.getForClass(fieldAccessor.getFieldType().getTypeClass());
+ if (typeInfo instanceof TupleTypeInfo) {
+ isTuple = true;
+ serializer = null;
+ } else {
+ isTuple = false;
+ this.serializer = typeInfo.createSerializer(config);
+ }
}
- @SuppressWarnings("unchecked")
@Override
+ @SuppressWarnings("unchecked")
public T reduce(T value1, T value2) throws Exception {
- return fieldAccessor.set(value1, adder.add(fieldAccessor.get(value1), fieldAccessor.get(value2)));
+ if (isTuple) {
+ Tuple result = ((Tuple)value1).copy();
+ return fieldAccessor.set((T) result, adder.add(fieldAccessor.get(value1), fieldAccessor.get(value2)));
+ } else {
+ T result = serializer.copy(value1);
+ return fieldAccessor.set(result, adder.add(fieldAccessor.get(value1), fieldAccessor.get(value2)));
+ }
}
}
http://git-wip-us.apache.org/repos/asf/flink/blob/712c868e/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/windowing/FoldWindowFunction.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/windowing/FoldWindowFunction.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/windowing/FoldWindowFunction.java
index 04d2ac7..1d29e36 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/windowing/FoldWindowFunction.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/windowing/FoldWindowFunction.java
@@ -22,7 +22,6 @@ import org.apache.flink.api.common.functions.FoldFunction;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.common.typeutils.TypeSerializer;
import org.apache.flink.api.java.operators.translation.WrappingFunction;
-import org.apache.flink.configuration.Configuration;
import org.apache.flink.core.memory.InputViewDataInputStreamWrapper;
import org.apache.flink.core.memory.OutputViewDataOutputStreamWrapper;
import org.apache.flink.streaming.api.operators.OutputTypeConfigurable;
@@ -34,6 +33,7 @@ import java.io.ByteArrayOutputStream;
import java.io.DataInputStream;
import java.io.DataOutputStream;
import java.io.IOException;
+import java.io.ObjectInputStream;
public class FoldWindowFunction<K, W extends Window, T, R>
extends WrappingFunction<FoldFunction<T, R>>
@@ -49,9 +49,8 @@ public class FoldWindowFunction<K, W extends Window, T, R>
this.initialValue = initialValue;
}
- @Override
- public void open(Configuration configuration) throws Exception {
- super.open(configuration);
+ private void readObject(ObjectInputStream in) throws IOException, ClassNotFoundException {
+ in.defaultReadObject();
if (serializedInitialValue == null) {
throw new RuntimeException("No initial value was serialized for the fold " +
@@ -59,10 +58,11 @@ public class FoldWindowFunction<K, W extends Window, T, R>
}
ByteArrayInputStream bais = new ByteArrayInputStream(serializedInitialValue);
- InputViewDataInputStreamWrapper in = new InputViewDataInputStreamWrapper(
+ InputViewDataInputStreamWrapper inStream = new InputViewDataInputStreamWrapper(
new DataInputStream(bais)
);
- initialValue = outSerializer.deserialize(in);
+
+ initialValue = outSerializer.deserialize(inStream);
}
@Override
http://git-wip-us.apache.org/repos/asf/flink/blob/712c868e/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/NonKeyedWindowOperator.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/NonKeyedWindowOperator.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/NonKeyedWindowOperator.java
index a002b23..03e8c4c 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/NonKeyedWindowOperator.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/NonKeyedWindowOperator.java
@@ -231,9 +231,8 @@ public class NonKeyedWindowOperator<IN, OUT, W extends Window>
context = new Context(window, windowBuffer);
windows.put(window, context);
}
- StreamRecord<IN> elementCopy = new StreamRecord<>(inputSerializer.copy(element.getValue()), element.getTimestamp());
- context.windowBuffer.storeElement(elementCopy);
- Trigger.TriggerResult triggerResult = trigger.onElement(elementCopy.getValue(), elementCopy.getTimestamp(), window, context);
+ context.windowBuffer.storeElement(element);
+ Trigger.TriggerResult triggerResult = trigger.onElement(element.getValue(), element.getTimestamp(), window, context);
processTriggerResult(triggerResult, window);
}
}
http://git-wip-us.apache.org/repos/asf/flink/blob/712c868e/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/WindowOperator.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/WindowOperator.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/WindowOperator.java
index a80f971..30ce477 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/WindowOperator.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/WindowOperator.java
@@ -276,9 +276,8 @@ public class WindowOperator<K, IN, OUT, W extends Window>
context = new Context(key, window, windowBuffer);
keyWindows.put(window, context);
}
- StreamRecord<IN> elementCopy = new StreamRecord<>(inputSerializer.copy(element.getValue()), element.getTimestamp());
- context.windowBuffer.storeElement(elementCopy);
- Trigger.TriggerResult triggerResult = trigger.onElement(elementCopy.getValue(), elementCopy.getTimestamp(), window, context);
+ context.windowBuffer.storeElement(element);
+ Trigger.TriggerResult triggerResult = trigger.onElement(element.getValue(), element.getTimestamp(), window, context);
processTriggerResult(triggerResult, key, window);
}
}