You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by se...@apache.org on 2015/04/10 16:23:46 UTC
[1/2] flink git commit: [tests] Increase robustness of
SimpleRecoveryITCase
Repository: flink
Updated Branches:
refs/heads/master 3246255bc -> 36fcdae58
[tests] Increase robustness of SimpleRecoveryITCase
Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/36fcdae5
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/36fcdae5
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/36fcdae5
Branch: refs/heads/master
Commit: 36fcdae581cdc9578a33d3d8bb7a058080b7bece
Parents: 211d0bd
Author: Stephan Ewen <se...@apache.org>
Authored: Fri Apr 10 15:30:10 2015 +0200
Committer: Stephan Ewen <se...@apache.org>
Committed: Fri Apr 10 16:22:51 2015 +0200
----------------------------------------------------------------------
.../org/apache/flink/test/recovery/SimpleRecoveryITCase.java | 8 ++++----
1 file changed, 4 insertions(+), 4 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/36fcdae5/flink-tests/src/test/java/org/apache/flink/test/recovery/SimpleRecoveryITCase.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/recovery/SimpleRecoveryITCase.java b/flink-tests/src/test/java/org/apache/flink/test/recovery/SimpleRecoveryITCase.java
index 42b1c15..0361af8 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/recovery/SimpleRecoveryITCase.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/recovery/SimpleRecoveryITCase.java
@@ -199,7 +199,7 @@ public class SimpleRecoveryITCase {
"localhost", cluster.getJobManagerRPCPort());
env.setParallelism(4);
- env.setNumberOfExecutionRetries(3);
+ env.setNumberOfExecutionRetries(5);
env.generateSequence(1, 10)
.rebalance()
@@ -238,7 +238,7 @@ public class SimpleRecoveryITCase {
private static class FailingMapper1<T> extends RichMapFunction<T, T> {
- private static int failuresBeforeSuccess = 1;
+ private static volatile int failuresBeforeSuccess = 1;
@Override
public T map(T value) throws Exception {
@@ -253,7 +253,7 @@ public class SimpleRecoveryITCase {
private static class FailingMapper2<T> extends RichMapFunction<T, T> {
- private static int failuresBeforeSuccess = 1;
+ private static volatile int failuresBeforeSuccess = 1;
@Override
public T map(T value) throws Exception {
@@ -268,7 +268,7 @@ public class SimpleRecoveryITCase {
private static class FailingMapper3<T> extends RichMapFunction<T, T> {
- private static int failuresBeforeSuccess = 3;
+ private static volatile int failuresBeforeSuccess = 3;
@Override
public T map(T value) throws Exception {
[2/2] flink git commit: [FLINK-1862] [apis] Add support for
non-serializable types for collect() by switching from Java serialization to
Flink serialization
Posted by se...@apache.org.
[FLINK-1862] [apis] Add support for non-serializable types for collect() by switching from Java serialization to Flink serialization
Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/211d0bdb
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/211d0bdb
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/211d0bdb
Branch: refs/heads/master
Commit: 211d0bdbf956f6bf17ea40f72d6aca9cd4fb2fad
Parents: 3246255
Author: Stephan Ewen <se...@apache.org>
Authored: Fri Apr 10 15:02:26 2015 +0200
Committer: Stephan Ewen <se...@apache.org>
Committed: Fri Apr 10 16:22:51 2015 +0200
----------------------------------------------------------------------
.../accumulators/SerializedListAccumulator.java | 49 ++++++++++----------
.../java/org/apache/flink/api/java/DataSet.java | 18 ++-----
.../java/org/apache/flink/api/java/Utils.java | 12 +++--
.../org/apache/flink/api/scala/DataSet.scala | 19 +++-----
.../test/classloading/jar/KMeansForTest.java | 3 +-
5 files changed, 45 insertions(+), 56 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/211d0bdb/flink-core/src/main/java/org/apache/flink/api/common/accumulators/SerializedListAccumulator.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/api/common/accumulators/SerializedListAccumulator.java b/flink-core/src/main/java/org/apache/flink/api/common/accumulators/SerializedListAccumulator.java
index 4ab339b..65a8c39 100644
--- a/flink-core/src/main/java/org/apache/flink/api/common/accumulators/SerializedListAccumulator.java
+++ b/flink-core/src/main/java/org/apache/flink/api/common/accumulators/SerializedListAccumulator.java
@@ -18,8 +18,14 @@
package org.apache.flink.api.common.accumulators;
-import org.apache.flink.util.InstantiationUtil;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.core.memory.InputViewDataInputStreamWrapper;
+import org.apache.flink.core.memory.OutputViewDataOutputStreamWrapper;
+import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
+import java.io.DataInputStream;
+import java.io.DataOutputStream;
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
@@ -37,19 +43,24 @@ public class SerializedListAccumulator<T> implements Accumulator<T, ArrayList<by
private static final long serialVersionUID = 1L;
private ArrayList<byte[]> localValue = new ArrayList<byte[]>();
+
@Override
public void add(T value) {
- if (value == null) {
- throw new NullPointerException("Value to accumulate must nor be null");
- }
-
+ throw new UnsupportedOperationException();
+ }
+
+ public void add(T value, TypeSerializer<T> serializer) throws IOException {
try {
- byte[] byteArray = InstantiationUtil.serializeObject(value);
- localValue.add(byteArray);
+ ByteArrayOutputStream outStream = new ByteArrayOutputStream();
+ OutputViewDataOutputStreamWrapper out =
+ new OutputViewDataOutputStreamWrapper(new DataOutputStream(outStream));
+
+ serializer.serialize(value, out);
+ localValue.add(outStream.toByteArray());
}
catch (IOException e) {
- throw new RuntimeException("Serialization of accumulated value failed", e);
+ throw new IOException("Failed to serialize value '" + value + '\'', e);
}
}
@@ -58,21 +69,6 @@ public class SerializedListAccumulator<T> implements Accumulator<T, ArrayList<by
return localValue;
}
- public ArrayList<T> deserializeLocalValue(ClassLoader classLoader) {
- try {
- ArrayList<T> arrList = new ArrayList<T>(localValue.size());
- for (byte[] byteArr : localValue) {
- @SuppressWarnings("unchecked")
- T item = (T) InstantiationUtil.deserializeObject(byteArr, classLoader);
- arrList.add(item);
- }
- return arrList;
- }
- catch (Exception e) {
- throw new RuntimeException("Cannot deserialize accumulator list element", e);
- }
- }
-
@Override
public void resetLocal() {
localValue.clear();
@@ -91,12 +87,15 @@ public class SerializedListAccumulator<T> implements Accumulator<T, ArrayList<by
}
@SuppressWarnings("unchecked")
- public static <T> List<T> deserializeList(ArrayList<byte[]> data, ClassLoader loader)
+ public static <T> List<T> deserializeList(ArrayList<byte[]> data, TypeSerializer<T> serializer)
throws IOException, ClassNotFoundException
{
List<T> result = new ArrayList<T>(data.size());
for (byte[] bytes : data) {
- result.add((T) InstantiationUtil.deserializeObject(bytes, loader));
+ ByteArrayInputStream inStream = new ByteArrayInputStream(bytes);
+ InputViewDataInputStreamWrapper in = new InputViewDataInputStreamWrapper(new DataInputStream(inStream));
+ T val = serializer.deserialize(in);
+ result.add(val);
}
return result;
}
http://git-wip-us.apache.org/repos/asf/flink/blob/211d0bdb/flink-java/src/main/java/org/apache/flink/api/java/DataSet.java
----------------------------------------------------------------------
diff --git a/flink-java/src/main/java/org/apache/flink/api/java/DataSet.java b/flink-java/src/main/java/org/apache/flink/api/java/DataSet.java
index 7cee323..a6a0af8 100644
--- a/flink-java/src/main/java/org/apache/flink/api/java/DataSet.java
+++ b/flink-java/src/main/java/org/apache/flink/api/java/DataSet.java
@@ -42,6 +42,7 @@ import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.common.operators.base.CrossOperatorBase.CrossHint;
import org.apache.flink.api.common.operators.base.JoinOperatorBase.JoinHint;
import org.apache.flink.api.common.operators.base.PartitionOperatorBase.PartitionMethod;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
import org.apache.flink.api.java.aggregation.Aggregations;
import org.apache.flink.api.java.functions.FirstReducer;
import org.apache.flink.api.java.functions.FormattingMapper;
@@ -411,24 +412,15 @@ public abstract class DataSet<T> {
* @see org.apache.flink.api.java.Utils.CollectHelper
*/
public List<T> collect() throws Exception {
- // validate that our type is actually serializable
- Class<?> typeClass = getType().getTypeClass();
- ClassLoader cl = typeClass.getClassLoader() == null ? ClassLoader.getSystemClassLoader()
- : typeClass.getClassLoader();
-
- if (!java.io.Serializable.class.isAssignableFrom(typeClass)) {
- throw new UnsupportedOperationException("collect() can only be used with serializable data types. "
- + "The DataSet type '" + typeClass.getName() + "' does not implement java.io.Serializable.");
- }
-
final String id = new AbstractID().toString();
-
- this.flatMap(new Utils.CollectHelper<T>(id)).output(new DiscardingOutputFormat<T>());
+ final TypeSerializer<T> serializer = getType().createSerializer(getExecutionEnvironment().getConfig());
+
+ this.flatMap(new Utils.CollectHelper<T>(id, serializer)).output(new DiscardingOutputFormat<T>());
JobExecutionResult res = getExecutionEnvironment().execute();
ArrayList<byte[]> accResult = res.getAccumulatorResult(id);
try {
- return SerializedListAccumulator.deserializeList(accResult, cl);
+ return SerializedListAccumulator.deserializeList(accResult, serializer);
}
catch (ClassNotFoundException e) {
throw new RuntimeException("Cannot find type class of collected data type.", e);
http://git-wip-us.apache.org/repos/asf/flink/blob/211d0bdb/flink-java/src/main/java/org/apache/flink/api/java/Utils.java
----------------------------------------------------------------------
diff --git a/flink-java/src/main/java/org/apache/flink/api/java/Utils.java b/flink-java/src/main/java/org/apache/flink/api/java/Utils.java
index 5351484..38b24a2 100644
--- a/flink-java/src/main/java/org/apache/flink/api/java/Utils.java
+++ b/flink-java/src/main/java/org/apache/flink/api/java/Utils.java
@@ -22,6 +22,7 @@ import org.apache.commons.lang3.StringUtils;
import org.apache.flink.api.common.accumulators.SerializedListAccumulator;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.common.typeutils.CompositeType;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
import org.apache.flink.api.java.typeutils.GenericTypeInfo;
import java.lang.reflect.Field;
@@ -97,21 +98,24 @@ public class Utils {
private static final long serialVersionUID = 1L;
private final String id;
- private final SerializedListAccumulator<T> accumulator;
+ private final TypeSerializer<T> serializer;
+
+ private SerializedListAccumulator<T> accumulator;
- public CollectHelper(String id) {
+ public CollectHelper(String id, TypeSerializer<T> serializer) {
this.id = id;
- this.accumulator = new SerializedListAccumulator<T>();
+ this.serializer = serializer;
}
@Override
public void open(Configuration parameters) throws Exception {
+ this.accumulator = new SerializedListAccumulator<T>();
getRuntimeContext().addAccumulator(id, accumulator);
}
@Override
public void flatMap(T value, Collector<T> out) throws Exception {
- accumulator.add(value);
+ accumulator.add(value, serializer);
}
}
http://git-wip-us.apache.org/repos/asf/flink/blob/211d0bdb/flink-scala/src/main/scala/org/apache/flink/api/scala/DataSet.scala
----------------------------------------------------------------------
diff --git a/flink-scala/src/main/scala/org/apache/flink/api/scala/DataSet.scala b/flink-scala/src/main/scala/org/apache/flink/api/scala/DataSet.scala
index 48a5285..3b80a23 100644
--- a/flink-scala/src/main/scala/org/apache/flink/api/scala/DataSet.scala
+++ b/flink-scala/src/main/scala/org/apache/flink/api/scala/DataSet.scala
@@ -26,6 +26,7 @@ import org.apache.flink.api.common.functions._
import org.apache.flink.api.common.io.{FileOutputFormat, OutputFormat}
import org.apache.flink.api.common.operators.Order
import org.apache.flink.api.common.operators.base.PartitionOperatorBase.PartitionMethod
+import org.apache.flink.api.common.typeutils.TypeSerializer
import org.apache.flink.api.java.Utils.CountHelper
import org.apache.flink.api.java.aggregation.Aggregations
import org.apache.flink.api.java.functions.{FirstReducer, KeySelector}
@@ -537,24 +538,18 @@ class DataSet[T: ClassTag](set: JavaDataSet[T]) {
*/
@throws(classOf[Exception])
def collect(): Seq[T] = {
- val typeClass: Class[_] = getType().getTypeClass()
- val cl: ClassLoader = if (typeClass.getClassLoader == null) ClassLoader.getSystemClassLoader
- else typeClass.getClassLoader
-
- if (typeClass != null && !classOf[java.io.Serializable].isAssignableFrom(typeClass)) {
- throw new UnsupportedOperationException(
- "collect() can only be used with serializable data types. " +
- "The DataSet type '" + typeClass.getName + "' does not implement java.io.Serializable.")
- }
-
val id = new AbstractID().toString
- javaSet.flatMap(new Utils.CollectHelper[T](id)).output(new DiscardingOutputFormat[T])
+ val serializer = getType().createSerializer(getExecutionEnvironment.getConfig)
+
+ javaSet.flatMap(new Utils.CollectHelper[T](id, serializer))
+ .output(new DiscardingOutputFormat[T])
+
val res = getExecutionEnvironment.execute()
val accResult: java.util.ArrayList[Array[Byte]] = res.getAccumulatorResult(id)
try {
- SerializedListAccumulator.deserializeList(accResult, cl).asScala
+ SerializedListAccumulator.deserializeList(accResult, serializer).asScala
}
catch {
case e: ClassNotFoundException => {
http://git-wip-us.apache.org/repos/asf/flink/blob/211d0bdb/flink-tests/src/test/java/org/apache/flink/test/classloading/jar/KMeansForTest.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/classloading/jar/KMeansForTest.java b/flink-tests/src/test/java/org/apache/flink/test/classloading/jar/KMeansForTest.java
index 083b2bc..794efbd 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/classloading/jar/KMeansForTest.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/classloading/jar/KMeansForTest.java
@@ -28,7 +28,6 @@ import org.apache.flink.api.java.operators.IterativeDataSet;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.configuration.Configuration;
-import java.io.Serializable;
import java.util.Collection;
/**
@@ -107,7 +106,7 @@ public class KMeansForTest {
/**
* A simple two-dimensional point.
*/
- public static class Point implements Serializable {
+ public static class Point {
public double x, y;