You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by al...@apache.org on 2015/10/23 15:46:55 UTC

[3/3] flink git commit: [hotfix] Fix Mutable Object window aggregator/Disable Object Copy

[hotfix] Fix Mutable Object window aggregator/Disable Object Copy

This fixes the aggregators to make copies of the objects so that it
works with window operators that are not mutable-object safe.

This also disables object copy in WindowOperator and
NonKeyedWindowOperator.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/712c868e
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/712c868e
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/712c868e

Branch: refs/heads/release-0.10
Commit: 712c868eb77643ae07542d5a073365e0862a5e97
Parents: 45ab0eb
Author: Aljoscha Krettek <al...@gmail.com>
Authored: Fri Oct 23 13:08:35 2015 +0200
Committer: Aljoscha Krettek <al...@gmail.com>
Committed: Fri Oct 23 15:46:28 2015 +0200

----------------------------------------------------------------------
 .../aggregation/AggregationFunction.java        |  8 +--
 .../aggregation/ComparableAggregator.java       | 51 ++++++++++----------
 .../functions/aggregation/SumAggregator.java    | 35 +++++++++++---
 .../functions/windowing/FoldWindowFunction.java | 12 ++---
 .../windowing/NonKeyedWindowOperator.java       |  5 +-
 .../operators/windowing/WindowOperator.java     |  5 +-
 6 files changed, 66 insertions(+), 50 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/712c868e/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/aggregation/AggregationFunction.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/aggregation/AggregationFunction.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/aggregation/AggregationFunction.java
index 23cca90..ed39103 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/aggregation/AggregationFunction.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/aggregation/AggregationFunction.java
@@ -22,13 +22,7 @@ import org.apache.flink.api.common.functions.RichReduceFunction;
 public abstract class AggregationFunction<T> extends RichReduceFunction<T> {
 	private static final long serialVersionUID = 1L;
 
-	public int position;
-
-	public AggregationFunction(int pos) {
-		this.position = pos;
-	}
-
-	public static enum AggregationType {
+	public enum AggregationType {
 		SUM, MIN, MAX, MINBY, MAXBY,
 	}
 

http://git-wip-us.apache.org/repos/asf/flink/blob/712c868e/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/aggregation/ComparableAggregator.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/aggregation/ComparableAggregator.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/aggregation/ComparableAggregator.java
index e5501a0..e70e30a 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/aggregation/ComparableAggregator.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/aggregation/ComparableAggregator.java
@@ -25,35 +25,39 @@ public class ComparableAggregator<T> extends AggregationFunction<T> {
 
 	private static final long serialVersionUID = 1L;
 
-	public Comparator comparator;
-	public boolean byAggregate;
-	public boolean first;
-	FieldAccessor<T, Object> fieldAccessor;
+	private Comparator comparator;
+	private boolean byAggregate;
+	private boolean first;
+	private final FieldAccessor<T, Object> fieldAccessor;
 	
-	private ComparableAggregator(int pos, AggregationType aggregationType, boolean first) {
-		super(pos);
+	private ComparableAggregator(AggregationType aggregationType, FieldAccessor<T, Object> fieldAccessor, boolean first) {
 		this.comparator = Comparator.getForAggregation(aggregationType);
 		this.byAggregate = (aggregationType == AggregationType.MAXBY) || (aggregationType == AggregationType.MINBY);
 		this.first = first;
+		this.fieldAccessor = fieldAccessor;
 	}
 
-	public ComparableAggregator(int positionToAggregate, TypeInformation<T> typeInfo, AggregationType aggregationType
-			, ExecutionConfig config) {
+	public ComparableAggregator(int positionToAggregate,
+			TypeInformation<T> typeInfo,
+			AggregationType aggregationType,
+			ExecutionConfig config) {
 		this(positionToAggregate, typeInfo, aggregationType, false, config);
 	}
 
-	public ComparableAggregator(int positionToAggregate, TypeInformation<T> typeInfo, AggregationType aggregationType,
-								boolean first, ExecutionConfig config) {
-		this(positionToAggregate, aggregationType, first);
-		this.fieldAccessor = FieldAccessor.create(positionToAggregate, typeInfo, config);
-		this.first = first;
+	public ComparableAggregator(int positionToAggregate,
+			TypeInformation<T> typeInfo,
+			AggregationType aggregationType,
+			boolean first,
+			ExecutionConfig config) {
+		this(aggregationType, FieldAccessor.create(positionToAggregate, typeInfo, config), first);
 	}
 
 	public ComparableAggregator(String field,
-			TypeInformation<T> typeInfo, AggregationType aggregationType, boolean first, ExecutionConfig config) {
-		this(0, aggregationType, first);
-		this.fieldAccessor = FieldAccessor.create(field, typeInfo, config);
-		this.first = first;
+			TypeInformation<T> typeInfo,
+			AggregationType aggregationType,
+			boolean first,
+			ExecutionConfig config) {
+		this(aggregationType, FieldAccessor.create(field, typeInfo, config), first);
 	}
 
 
@@ -66,16 +70,13 @@ public class ComparableAggregator<T> extends AggregationFunction<T> {
 		int c = comparator.isExtremal(o1, o2);
 
 		if (byAggregate) {
-			if (c == 1) {
-				return value1;
-			}
-			if (first) {
-				if (c == 0) {
-					return value1;
-				}
+			// if they are the same we choose based on whether we want to first or last
+			// element with the min/max.
+			if (c == 0) {
+				return first ? value1 : value2;
 			}
 
-			return value2;
+			return c == 1 ? value1 : value2;
 
 		} else {
 			if (c == 0) {

http://git-wip-us.apache.org/repos/asf/flink/blob/712c868e/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/aggregation/SumAggregator.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/aggregation/SumAggregator.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/aggregation/SumAggregator.java
index b045233..8c9cf7a 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/aggregation/SumAggregator.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/aggregation/SumAggregator.java
@@ -19,30 +19,53 @@ package org.apache.flink.streaming.api.functions.aggregation;
 
 import org.apache.flink.api.common.ExecutionConfig;
 import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.api.java.tuple.Tuple;
+import org.apache.flink.api.java.typeutils.TupleTypeInfo;
 import org.apache.flink.streaming.util.FieldAccessor;
 
 public class SumAggregator<T> extends AggregationFunction<T> {
 
 	private static final long serialVersionUID = 1L;
 
-	FieldAccessor<T, Object> fieldAccessor;
-	SumFunction adder;
+	private final FieldAccessor<T, Object> fieldAccessor;
+	private final SumFunction adder;
+	private final TypeSerializer<T> serializer;
+	private final boolean isTuple;
 
 	public SumAggregator(int pos, TypeInformation<T> typeInfo, ExecutionConfig config) {
-		super(pos);
 		fieldAccessor = FieldAccessor.create(pos, typeInfo, config);
 		adder = SumFunction.getForClass(fieldAccessor.getFieldType().getTypeClass());
+		if (typeInfo instanceof TupleTypeInfo) {
+			isTuple = true;
+			serializer = null;
+		} else {
+			isTuple = false;
+			this.serializer = typeInfo.createSerializer(config);
+		}
 	}
 
 	public SumAggregator(String field, TypeInformation<T> typeInfo, ExecutionConfig config) {
-		super(0);
 		fieldAccessor = FieldAccessor.create(field, typeInfo, config);
 		adder = SumFunction.getForClass(fieldAccessor.getFieldType().getTypeClass());
+		if (typeInfo instanceof TupleTypeInfo) {
+			isTuple = true;
+			serializer = null;
+		} else {
+			isTuple = false;
+			this.serializer = typeInfo.createSerializer(config);
+		}
 	}
 
-	@SuppressWarnings("unchecked")
 	@Override
+	@SuppressWarnings("unchecked")
 	public T reduce(T value1, T value2) throws Exception {
-		return fieldAccessor.set(value1, adder.add(fieldAccessor.get(value1), fieldAccessor.get(value2)));
+		if (isTuple) {
+			Tuple result = ((Tuple)value1).copy();
+			return fieldAccessor.set((T) result, adder.add(fieldAccessor.get(value1), fieldAccessor.get(value2)));
+		} else {
+			T result = serializer.copy(value1);
+			return fieldAccessor.set(result, adder.add(fieldAccessor.get(value1), fieldAccessor.get(value2)));
+		}
 	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/712c868e/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/windowing/FoldWindowFunction.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/windowing/FoldWindowFunction.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/windowing/FoldWindowFunction.java
index 04d2ac7..1d29e36 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/windowing/FoldWindowFunction.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/windowing/FoldWindowFunction.java
@@ -22,7 +22,6 @@ import org.apache.flink.api.common.functions.FoldFunction;
 import org.apache.flink.api.common.typeinfo.TypeInformation;
 import org.apache.flink.api.common.typeutils.TypeSerializer;
 import org.apache.flink.api.java.operators.translation.WrappingFunction;
-import org.apache.flink.configuration.Configuration;
 import org.apache.flink.core.memory.InputViewDataInputStreamWrapper;
 import org.apache.flink.core.memory.OutputViewDataOutputStreamWrapper;
 import org.apache.flink.streaming.api.operators.OutputTypeConfigurable;
@@ -34,6 +33,7 @@ import java.io.ByteArrayOutputStream;
 import java.io.DataInputStream;
 import java.io.DataOutputStream;
 import java.io.IOException;
+import java.io.ObjectInputStream;
 
 public class FoldWindowFunction<K, W extends Window, T, R>
 		extends WrappingFunction<FoldFunction<T, R>>
@@ -49,9 +49,8 @@ public class FoldWindowFunction<K, W extends Window, T, R>
 		this.initialValue = initialValue;
 	}
 
-	@Override
-	public void open(Configuration configuration) throws Exception {
-		super.open(configuration);
+	private void readObject(ObjectInputStream in) throws IOException, ClassNotFoundException {
+		in.defaultReadObject();
 
 		if (serializedInitialValue == null) {
 			throw new RuntimeException("No initial value was serialized for the fold " +
@@ -59,10 +58,11 @@ public class FoldWindowFunction<K, W extends Window, T, R>
 		}
 
 		ByteArrayInputStream bais = new ByteArrayInputStream(serializedInitialValue);
-		InputViewDataInputStreamWrapper in = new InputViewDataInputStreamWrapper(
+		InputViewDataInputStreamWrapper inStream = new InputViewDataInputStreamWrapper(
 				new DataInputStream(bais)
 		);
-		initialValue = outSerializer.deserialize(in);
+
+		initialValue = outSerializer.deserialize(inStream);
 	}
 
 	@Override

http://git-wip-us.apache.org/repos/asf/flink/blob/712c868e/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/NonKeyedWindowOperator.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/NonKeyedWindowOperator.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/NonKeyedWindowOperator.java
index a002b23..03e8c4c 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/NonKeyedWindowOperator.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/NonKeyedWindowOperator.java
@@ -231,9 +231,8 @@ public class NonKeyedWindowOperator<IN, OUT, W extends Window>
 				context = new Context(window, windowBuffer);
 				windows.put(window, context);
 			}
-			StreamRecord<IN> elementCopy = new StreamRecord<>(inputSerializer.copy(element.getValue()), element.getTimestamp());
-			context.windowBuffer.storeElement(elementCopy);
-			Trigger.TriggerResult triggerResult = trigger.onElement(elementCopy.getValue(), elementCopy.getTimestamp(), window, context);
+			context.windowBuffer.storeElement(element);
+			Trigger.TriggerResult triggerResult = trigger.onElement(element.getValue(), element.getTimestamp(), window, context);
 			processTriggerResult(triggerResult, window);
 		}
 	}

http://git-wip-us.apache.org/repos/asf/flink/blob/712c868e/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/WindowOperator.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/WindowOperator.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/WindowOperator.java
index a80f971..30ce477 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/WindowOperator.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/WindowOperator.java
@@ -276,9 +276,8 @@ public class WindowOperator<K, IN, OUT, W extends Window>
 				context = new Context(key, window, windowBuffer);
 				keyWindows.put(window, context);
 			}
-			StreamRecord<IN> elementCopy = new StreamRecord<>(inputSerializer.copy(element.getValue()), element.getTimestamp());
-			context.windowBuffer.storeElement(elementCopy);
-			Trigger.TriggerResult triggerResult = trigger.onElement(elementCopy.getValue(), elementCopy.getTimestamp(), window, context);
+			context.windowBuffer.storeElement(element);
+			Trigger.TriggerResult triggerResult = trigger.onElement(element.getValue(), element.getTimestamp(), window, context);
 			processTriggerResult(triggerResult, key, window);
 		}
 	}