You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by gr...@apache.org on 2017/02/06 15:45:36 UTC
[2/2] flink git commit: [FLINK-5694] [gelly] Collect DataSetAnalytic
[FLINK-5694] [gelly] Collect DataSetAnalytic
Adds a DataSetAnalytic that accumulates elements and returns elements
using a List. This mirrors the implementation of DataSet.collect() but
using the analytic execution workflow.
This closes #3245
Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/49016225
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/49016225
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/49016225
Branch: refs/heads/master
Commit: 4901622594af7b727baf08be6ee82803e7c4e645
Parents: a7e5705
Author: Greg Hogan <co...@greghogan.com>
Authored: Tue Jan 31 12:20:14 2017 -0500
Committer: Greg Hogan <co...@greghogan.com>
Committed: Mon Feb 6 09:44:41 2017 -0500
----------------------------------------------------------------------
.../apache/flink/graph/asm/dataset/Collect.java | 103 +++++++++++++++++++
.../flink/graph/asm/dataset/CollectTest.java | 55 ++++++++++
2 files changed, 158 insertions(+)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/49016225/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/dataset/Collect.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/dataset/Collect.java b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/dataset/Collect.java
new file mode 100644
index 0000000..4398296
--- /dev/null
+++ b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/dataset/Collect.java
@@ -0,0 +1,103 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.graph.asm.dataset;
+
+import org.apache.flink.api.common.accumulators.SerializedListAccumulator;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.api.java.DataSet;
+import org.apache.flink.api.java.ExecutionEnvironment;
+import org.apache.flink.graph.AnalyticHelper;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+
+/**
+ * Collect the elements of a {@link DataSet} into a {@link List}.
+ *
+ * @param <T> element type
+ */
+public class Collect<T>
+extends AbstractDataSetAnalytic<T, List<T>> {
+
+ private static final String COLLECT = "collect";
+
+ private CollectHelper<T> collectHelper;
+
+ private TypeSerializer<T> serializer;
+
+ @Override
+ public Collect<T> run(DataSet<T> input)
+ throws Exception {
+ super.run(input);
+
+ ExecutionEnvironment env = input.getExecutionEnvironment();
+ serializer = input.getType().createSerializer(env.getConfig());
+
+ collectHelper = new CollectHelper<>(serializer);
+
+ input
+ .output(collectHelper)
+ .name("Collect");
+
+ return this;
+ }
+
+ @Override
+ public List<T> getResult() {
+ ArrayList<byte[]> accResult = collectHelper.getAccumulator(env, COLLECT);
+ if (accResult != null) {
+ try {
+ return SerializedListAccumulator.deserializeList(accResult, serializer);
+ } catch (ClassNotFoundException e) {
+ throw new RuntimeException("Cannot find type class of collected data type", e);
+ } catch (IOException e) {
+ throw new RuntimeException("Serialization error while deserializing collected data", e);
+ }
+ } else {
+ throw new RuntimeException("Unable to retrieve the DataSet");
+ }
+ }
+
+ private static class CollectHelper<U>
+ extends AnalyticHelper<U> {
+ private SerializedListAccumulator<U> accumulator;
+
+ private final TypeSerializer<U> serializer;
+
+ public CollectHelper(TypeSerializer<U> serializer) {
+ this.serializer = serializer;
+ }
+
+ @Override
+ public void open(int taskNumber, int numTasks) {
+ this.accumulator = new SerializedListAccumulator<>();
+ }
+
+ @Override
+ public void writeRecord(U record) throws IOException {
+ accumulator.add(record, serializer);
+ }
+
+ @Override
+ public void close() throws IOException {
+ addAccumulator(COLLECT, accumulator);
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/49016225/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/asm/dataset/CollectTest.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/asm/dataset/CollectTest.java b/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/asm/dataset/CollectTest.java
new file mode 100644
index 0000000..ec1af42
--- /dev/null
+++ b/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/asm/dataset/CollectTest.java
@@ -0,0 +1,55 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.graph.asm.dataset;
+
+import org.apache.commons.lang.ArrayUtils;
+import org.apache.flink.api.java.DataSet;
+import org.apache.flink.api.java.ExecutionEnvironment;
+import org.junit.Before;
+import org.junit.Test;
+
+import java.util.Arrays;
+import java.util.List;
+
+import static org.junit.Assert.assertArrayEquals;
+
+public class CollectTest {
+
+ private ExecutionEnvironment env;
+
+ @Before
+ public void setup()
+ throws Exception {
+ env = ExecutionEnvironment.createCollectionsEnvironment();
+ env.getConfig().enableObjectReuse();
+ }
+
+ @Test
+ public void testCollect()
+ throws Exception {
+ List<Long> list = Arrays.asList(ArrayUtils.toObject(
+ new long[]{ 0, 1, 2, 3, 4, 5, 6, 7, 8, 9 }));
+
+ DataSet<Long> dataset = env.fromCollection(list);
+
+ List<Long> collected = new Collect<Long>().run(dataset).execute();
+
+ assertArrayEquals(list.toArray(), collected.toArray());
+ }
+}