You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by gr...@apache.org on 2017/02/06 15:45:36 UTC

[2/2] flink git commit: [FLINK-5694] [gelly] Collect DataSetAnalytic

[FLINK-5694] [gelly] Collect DataSetAnalytic

Adds a DataSetAnalytic that accumulates elements and returns elements
using a List. This mirrors the implementation of DataSet.collect() but
using the analytic execution workflow.

This closes #3245


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/49016225
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/49016225
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/49016225

Branch: refs/heads/master
Commit: 4901622594af7b727baf08be6ee82803e7c4e645
Parents: a7e5705
Author: Greg Hogan <co...@greghogan.com>
Authored: Tue Jan 31 12:20:14 2017 -0500
Committer: Greg Hogan <co...@greghogan.com>
Committed: Mon Feb 6 09:44:41 2017 -0500

----------------------------------------------------------------------
 .../apache/flink/graph/asm/dataset/Collect.java | 103 +++++++++++++++++++
 .../flink/graph/asm/dataset/CollectTest.java    |  55 ++++++++++
 2 files changed, 158 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/49016225/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/dataset/Collect.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/dataset/Collect.java b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/dataset/Collect.java
new file mode 100644
index 0000000..4398296
--- /dev/null
+++ b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/dataset/Collect.java
@@ -0,0 +1,103 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.graph.asm.dataset;
+
+import org.apache.flink.api.common.accumulators.SerializedListAccumulator;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.api.java.DataSet;
+import org.apache.flink.api.java.ExecutionEnvironment;
+import org.apache.flink.graph.AnalyticHelper;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+
+/**
+ * Collect the elements of a {@link DataSet} into a {@link List}.
+ *
+ * @param <T> element type
+ */
+public class Collect<T>
+extends AbstractDataSetAnalytic<T, List<T>> {
+
+	private static final String COLLECT = "collect";
+
+	private CollectHelper<T> collectHelper;
+
+	private TypeSerializer<T> serializer;
+
+	@Override
+	public Collect<T> run(DataSet<T> input)
+			throws Exception {
+		super.run(input);
+
+		ExecutionEnvironment env = input.getExecutionEnvironment();
+		serializer = input.getType().createSerializer(env.getConfig());
+
+		collectHelper = new CollectHelper<>(serializer);
+
+		input
+			.output(collectHelper)
+				.name("Collect");
+
+		return this;
+	}
+
+	@Override
+	public List<T> getResult() {
+		ArrayList<byte[]> accResult = collectHelper.getAccumulator(env, COLLECT);
+		if (accResult != null) {
+			try {
+				return SerializedListAccumulator.deserializeList(accResult, serializer);
+			} catch (ClassNotFoundException e) {
+				throw new RuntimeException("Cannot find type class of collected data type", e);
+			} catch (IOException e) {
+				throw new RuntimeException("Serialization error while deserializing collected data", e);
+			}
+		} else {
+			throw new RuntimeException("Unable to retrieve the DataSet");
+		}
+	}
+
+	private static class CollectHelper<U>
+	extends AnalyticHelper<U> {
+		private SerializedListAccumulator<U> accumulator;
+
+		private final TypeSerializer<U> serializer;
+
+		public CollectHelper(TypeSerializer<U> serializer) {
+			this.serializer = serializer;
+		}
+
+		@Override
+		public void open(int taskNumber, int numTasks)  {
+			this.accumulator = new SerializedListAccumulator<>();
+		}
+
+		@Override
+		public void writeRecord(U record) throws IOException {
+			accumulator.add(record, serializer);
+		}
+
+		@Override
+		public void close() throws IOException {
+			addAccumulator(COLLECT, accumulator);
+		}
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/49016225/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/asm/dataset/CollectTest.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/asm/dataset/CollectTest.java b/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/asm/dataset/CollectTest.java
new file mode 100644
index 0000000..ec1af42
--- /dev/null
+++ b/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/asm/dataset/CollectTest.java
@@ -0,0 +1,55 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.graph.asm.dataset;
+
+import org.apache.commons.lang.ArrayUtils;
+import org.apache.flink.api.java.DataSet;
+import org.apache.flink.api.java.ExecutionEnvironment;
+import org.junit.Before;
+import org.junit.Test;
+
+import java.util.Arrays;
+import java.util.List;
+
+import static org.junit.Assert.assertArrayEquals;
+
+public class CollectTest {
+
+	private ExecutionEnvironment env;
+
+	@Before
+	public void setup()
+			throws Exception {
+		env = ExecutionEnvironment.createCollectionsEnvironment();
+		env.getConfig().enableObjectReuse();
+	}
+
+	@Test
+	public void testCollect()
+			throws Exception {
+		List<Long> list = Arrays.asList(ArrayUtils.toObject(
+			new long[]{ 0, 1, 2, 3, 4, 5, 6, 7, 8, 9 }));
+
+		DataSet<Long> dataset = env.fromCollection(list);
+
+		List<Long> collected = new Collect<Long>().run(dataset).execute();
+
+		assertArrayEquals(list.toArray(), collected.toArray());
+	}
+}