You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by rm...@apache.org on 2015/04/02 20:08:30 UTC
[3/5] flink git commit: Added support for Apache Tez as an execution
environment
http://git-wip-us.apache.org/repos/asf/flink/blob/2cbbd328/flink-staging/flink-tez/src/main/java/org/apache/flink/tez/dag/FlinkBroadcastEdge.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-tez/src/main/java/org/apache/flink/tez/dag/FlinkBroadcastEdge.java b/flink-staging/flink-tez/src/main/java/org/apache/flink/tez/dag/FlinkBroadcastEdge.java
new file mode 100644
index 0000000..6597733
--- /dev/null
+++ b/flink-staging/flink-tez/src/main/java/org/apache/flink/tez/dag/FlinkBroadcastEdge.java
@@ -0,0 +1,70 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.tez.dag;
+
+
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.optimizer.CompilerException;
+import org.apache.flink.tez.util.EncodingUtils;
+import org.apache.flink.tez.util.FlinkSerialization;
+import org.apache.hadoop.io.IntWritable;
+import org.apache.hadoop.io.serializer.WritableSerialization;
+import org.apache.tez.dag.api.Edge;
+import org.apache.tez.dag.api.EdgeProperty;
+import org.apache.tez.dag.api.TezConfiguration;
+import org.apache.tez.runtime.library.conf.UnorderedKVEdgeConfig;
+
+import java.util.HashMap;
+import java.util.Map;
+
+public class FlinkBroadcastEdge extends FlinkEdge {
+
+ public FlinkBroadcastEdge(FlinkVertex source, FlinkVertex target, TypeSerializer<?> typeSerializer) {
+ super(source, target, typeSerializer);
+ }
+
+ @Override
+ public Edge createEdge(TezConfiguration tezConf) {
+
+ Map<String,String> serializerMap = new HashMap<String,String>();
+ serializerMap.put("io.flink.typeserializer", EncodingUtils.encodeObjectToString(this.typeSerializer));
+
+ try {
+ UnorderedKVEdgeConfig edgeConfig =
+ (UnorderedKVEdgeConfig
+ .newBuilder(IntWritable.class.getName(), typeSerializer.createInstance().getClass().getName())
+ .setFromConfiguration(tezConf)
+ .setKeySerializationClass(WritableSerialization.class.getName(), null)
+ .setValueSerializationClass(FlinkSerialization.class.getName(), serializerMap)
+ .configureInput()
+ .setAdditionalConfiguration("io.flink.typeserializer", EncodingUtils.encodeObjectToString(this.typeSerializer))
+ )
+ .done()
+ .build();
+
+ EdgeProperty property = edgeConfig.createDefaultBroadcastEdgeProperty();
+ this.cached = Edge.create(source.getVertex(), target.getVertex(), property);
+ return cached;
+
+ } catch (Exception e) {
+ throw new CompilerException(
+ "An error occurred while creating a Tez Forward Edge: " + e.getMessage(), e);
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/2cbbd328/flink-staging/flink-tez/src/main/java/org/apache/flink/tez/dag/FlinkDataSinkVertex.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-tez/src/main/java/org/apache/flink/tez/dag/FlinkDataSinkVertex.java b/flink-staging/flink-tez/src/main/java/org/apache/flink/tez/dag/FlinkDataSinkVertex.java
new file mode 100644
index 0000000..e3ddb9e
--- /dev/null
+++ b/flink-staging/flink-tez/src/main/java/org/apache/flink/tez/dag/FlinkDataSinkVertex.java
@@ -0,0 +1,61 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.tez.dag;
+
+
+import org.apache.flink.optimizer.CompilerException;
+import org.apache.flink.tez.runtime.DataSinkProcessor;
+import org.apache.flink.tez.runtime.TezTaskConfig;
+import org.apache.flink.tez.util.EncodingUtils;
+import org.apache.tez.common.TezUtils;
+import org.apache.tez.dag.api.ProcessorDescriptor;
+import org.apache.tez.dag.api.TezConfiguration;
+import org.apache.tez.dag.api.Vertex;
+
+import java.io.IOException;
+
+public class FlinkDataSinkVertex extends FlinkVertex {
+
+ public FlinkDataSinkVertex(String taskName, int parallelism, TezTaskConfig taskConfig) {
+ super(taskName, parallelism, taskConfig);
+ }
+
+ @Override
+ public Vertex createVertex(TezConfiguration conf) {
+ try {
+ this.writeInputPositionsToConfig();
+ this.writeSubTasksInOutputToConfig();
+
+ conf.set(TezTaskConfig.TEZ_TASK_CONFIG, EncodingUtils.encodeObjectToString(taskConfig));
+
+ ProcessorDescriptor descriptor = ProcessorDescriptor.create(
+ DataSinkProcessor.class.getName());
+
+ descriptor.setUserPayload(TezUtils.createUserPayloadFromConf(conf));
+
+ cached = Vertex.create(this.getUniqueName(), descriptor, getParallelism());
+
+ return cached;
+ }
+ catch (IOException e) {
+ throw new CompilerException(
+ "An error occurred while creating a Tez Vertex: " + e.getMessage(), e);
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/2cbbd328/flink-staging/flink-tez/src/main/java/org/apache/flink/tez/dag/FlinkDataSourceVertex.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-tez/src/main/java/org/apache/flink/tez/dag/FlinkDataSourceVertex.java b/flink-staging/flink-tez/src/main/java/org/apache/flink/tez/dag/FlinkDataSourceVertex.java
new file mode 100644
index 0000000..913b854
--- /dev/null
+++ b/flink-staging/flink-tez/src/main/java/org/apache/flink/tez/dag/FlinkDataSourceVertex.java
@@ -0,0 +1,82 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.tez.dag;
+
+
+import org.apache.flink.optimizer.CompilerException;
+import org.apache.flink.tez.runtime.DataSourceProcessor;
+import org.apache.flink.tez.runtime.TezTaskConfig;
+import org.apache.flink.tez.runtime.input.FlinkInput;
+import org.apache.flink.tez.runtime.input.FlinkInputSplitGenerator;
+import org.apache.flink.tez.util.EncodingUtils;
+import org.apache.hadoop.security.Credentials;
+import org.apache.tez.common.TezUtils;
+import org.apache.tez.dag.api.DataSourceDescriptor;
+import org.apache.tez.dag.api.InputDescriptor;
+import org.apache.tez.dag.api.InputInitializerDescriptor;
+import org.apache.tez.dag.api.ProcessorDescriptor;
+import org.apache.tez.dag.api.TezConfiguration;
+import org.apache.tez.dag.api.Vertex;
+
+import java.io.IOException;
+
+public class FlinkDataSourceVertex extends FlinkVertex {
+
+ public FlinkDataSourceVertex(String taskName, int parallelism, TezTaskConfig taskConfig) {
+ super(taskName, parallelism, taskConfig);
+ }
+
+
+ @Override
+ public Vertex createVertex (TezConfiguration conf) {
+ try {
+ this.writeInputPositionsToConfig();
+ this.writeSubTasksInOutputToConfig();
+
+ taskConfig.setDatasourceProcessorName(this.getUniqueName());
+ conf.set(TezTaskConfig.TEZ_TASK_CONFIG, EncodingUtils.encodeObjectToString(taskConfig));
+
+ ProcessorDescriptor descriptor = ProcessorDescriptor.create(
+ DataSourceProcessor.class.getName());
+
+ descriptor.setUserPayload(TezUtils.createUserPayloadFromConf(conf));
+
+ InputDescriptor inputDescriptor = InputDescriptor.create(FlinkInput.class.getName());
+
+ InputInitializerDescriptor inputInitializerDescriptor =
+ InputInitializerDescriptor.create(FlinkInputSplitGenerator.class.getName()).setUserPayload(TezUtils.createUserPayloadFromConf(conf));
+
+ DataSourceDescriptor dataSourceDescriptor = DataSourceDescriptor.create(
+ inputDescriptor,
+ inputInitializerDescriptor,
+ new Credentials()
+ );
+
+ cached = Vertex.create(this.getUniqueName(), descriptor, getParallelism());
+
+ cached.addDataSource("Input " + this.getUniqueName(), dataSourceDescriptor);
+
+ return cached;
+ }
+ catch (IOException e) {
+ throw new CompilerException(
+ "An error occurred while creating a Tez Vertex: " + e.getMessage(), e);
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/2cbbd328/flink-staging/flink-tez/src/main/java/org/apache/flink/tez/dag/FlinkEdge.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-tez/src/main/java/org/apache/flink/tez/dag/FlinkEdge.java b/flink-staging/flink-tez/src/main/java/org/apache/flink/tez/dag/FlinkEdge.java
new file mode 100644
index 0000000..181e675
--- /dev/null
+++ b/flink-staging/flink-tez/src/main/java/org/apache/flink/tez/dag/FlinkEdge.java
@@ -0,0 +1,45 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.tez.dag;
+
+
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.tez.dag.api.Edge;
+import org.apache.tez.dag.api.TezConfiguration;
+
+public abstract class FlinkEdge {
+
+ protected FlinkVertex source;
+ protected FlinkVertex target;
+ protected TypeSerializer<?> typeSerializer;
+ protected Edge cached;
+
+ protected FlinkEdge(FlinkVertex source, FlinkVertex target, TypeSerializer<?> typeSerializer) {
+ this.source = source;
+ this.target = target;
+ this.typeSerializer = typeSerializer;
+ }
+
+ public abstract Edge createEdge(TezConfiguration tezConf);
+
+ public Edge getEdge () {
+ return cached;
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/2cbbd328/flink-staging/flink-tez/src/main/java/org/apache/flink/tez/dag/FlinkForwardEdge.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-tez/src/main/java/org/apache/flink/tez/dag/FlinkForwardEdge.java b/flink-staging/flink-tez/src/main/java/org/apache/flink/tez/dag/FlinkForwardEdge.java
new file mode 100644
index 0000000..4602e96
--- /dev/null
+++ b/flink-staging/flink-tez/src/main/java/org/apache/flink/tez/dag/FlinkForwardEdge.java
@@ -0,0 +1,71 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.tez.dag;
+
+
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.optimizer.CompilerException;
+import org.apache.flink.tez.util.EncodingUtils;
+import org.apache.flink.tez.util.FlinkSerialization;
+import org.apache.hadoop.io.IntWritable;
+import org.apache.hadoop.io.serializer.WritableSerialization;
+import org.apache.tez.dag.api.Edge;
+import org.apache.tez.dag.api.EdgeProperty;
+import org.apache.tez.dag.api.TezConfiguration;
+import org.apache.tez.runtime.library.conf.UnorderedKVEdgeConfig;
+
+import java.util.HashMap;
+import java.util.Map;
+
+public class FlinkForwardEdge extends FlinkEdge {
+
+ public FlinkForwardEdge(FlinkVertex source, FlinkVertex target, TypeSerializer<?> typeSerializer) {
+ super(source, target, typeSerializer);
+ }
+
+ @Override
+ public Edge createEdge(TezConfiguration tezConf) {
+
+ Map<String,String> serializerMap = new HashMap<String,String>();
+ serializerMap.put("io.flink.typeserializer", EncodingUtils.encodeObjectToString(this.typeSerializer));
+
+ try {
+ UnorderedKVEdgeConfig edgeConfig =
+ (UnorderedKVEdgeConfig
+ .newBuilder(IntWritable.class.getName(), typeSerializer.createInstance().getClass().getName())
+ .setFromConfiguration(tezConf)
+ .setKeySerializationClass(WritableSerialization.class.getName(), null)
+ .setValueSerializationClass(FlinkSerialization.class.getName(), serializerMap)
+ .configureInput()
+ .setAdditionalConfiguration("io.flink.typeserializer", EncodingUtils.encodeObjectToString(
+ this.typeSerializer
+ )))
+ .done()
+ .build();
+
+ EdgeProperty property = edgeConfig.createDefaultOneToOneEdgeProperty();
+ this.cached = Edge.create(source.getVertex(), target.getVertex(), property);
+ return cached;
+
+ } catch (Exception e) {
+ throw new CompilerException(
+ "An error occurred while creating a Tez Forward Edge: " + e.getMessage(), e);
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/2cbbd328/flink-staging/flink-tez/src/main/java/org/apache/flink/tez/dag/FlinkPartitionEdge.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-tez/src/main/java/org/apache/flink/tez/dag/FlinkPartitionEdge.java b/flink-staging/flink-tez/src/main/java/org/apache/flink/tez/dag/FlinkPartitionEdge.java
new file mode 100644
index 0000000..b5f8c2e
--- /dev/null
+++ b/flink-staging/flink-tez/src/main/java/org/apache/flink/tez/dag/FlinkPartitionEdge.java
@@ -0,0 +1,71 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.tez.dag;
+
+
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.optimizer.CompilerException;
+import org.apache.flink.tez.util.EncodingUtils;
+import org.apache.flink.tez.util.FlinkSerialization;
+import org.apache.flink.tez.runtime.output.SimplePartitioner;
+import org.apache.hadoop.io.IntWritable;
+import org.apache.hadoop.io.serializer.WritableSerialization;
+import org.apache.tez.dag.api.Edge;
+import org.apache.tez.dag.api.EdgeProperty;
+import org.apache.tez.dag.api.TezConfiguration;
+import org.apache.tez.runtime.library.conf.UnorderedPartitionedKVEdgeConfig;
+
+import java.util.HashMap;
+import java.util.Map;
+
+public class FlinkPartitionEdge extends FlinkEdge {
+
+ public FlinkPartitionEdge(FlinkVertex source, FlinkVertex target, TypeSerializer<?> typeSerializer) {
+ super(source, target, typeSerializer);
+ }
+
+ @Override
+ public Edge createEdge(TezConfiguration tezConf) {
+
+ Map<String,String> serializerMap = new HashMap<String,String>();
+ serializerMap.put("io.flink.typeserializer", EncodingUtils.encodeObjectToString(this.typeSerializer));
+
+ try {
+ UnorderedPartitionedKVEdgeConfig edgeConfig =
+ (UnorderedPartitionedKVEdgeConfig
+ .newBuilder(IntWritable.class.getName(), typeSerializer.createInstance().getClass().getName(), SimplePartitioner.class.getName())
+ .setFromConfiguration(tezConf)
+ .setKeySerializationClass(WritableSerialization.class.getName(), null)
+ .setValueSerializationClass(FlinkSerialization.class.getName(), serializerMap)
+ .configureInput()
+ .setAdditionalConfiguration("io.flink.typeserializer", EncodingUtils.encodeObjectToString(this.typeSerializer)))
+ .done()
+ .build();
+
+
+ EdgeProperty property = edgeConfig.createDefaultEdgeProperty();
+ this.cached = Edge.create(source.getVertex(), target.getVertex(), property);
+ return cached;
+
+ } catch (Exception e) {
+ throw new CompilerException(
+ "An error occurred while creating a Tez Shuffle Edge: " + e.getMessage(), e);
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/2cbbd328/flink-staging/flink-tez/src/main/java/org/apache/flink/tez/dag/FlinkProcessorVertex.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-tez/src/main/java/org/apache/flink/tez/dag/FlinkProcessorVertex.java b/flink-staging/flink-tez/src/main/java/org/apache/flink/tez/dag/FlinkProcessorVertex.java
new file mode 100644
index 0000000..2fbba36
--- /dev/null
+++ b/flink-staging/flink-tez/src/main/java/org/apache/flink/tez/dag/FlinkProcessorVertex.java
@@ -0,0 +1,61 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.tez.dag;
+
+import org.apache.flink.optimizer.CompilerException;
+import org.apache.flink.tez.runtime.RegularProcessor;
+import org.apache.flink.tez.runtime.TezTaskConfig;
+import org.apache.flink.tez.util.EncodingUtils;
+import org.apache.tez.common.TezUtils;
+import org.apache.tez.dag.api.ProcessorDescriptor;
+import org.apache.tez.dag.api.TezConfiguration;
+import org.apache.tez.dag.api.Vertex;
+
+import java.io.IOException;
+
+
+public class FlinkProcessorVertex extends FlinkVertex {
+
+ public FlinkProcessorVertex(String taskName, int parallelism, TezTaskConfig taskConfig) {
+ super(taskName, parallelism, taskConfig);
+ }
+
+ @Override
+ public Vertex createVertex(TezConfiguration conf) {
+ try {
+ this.writeInputPositionsToConfig();
+ this.writeSubTasksInOutputToConfig();
+
+ conf.set(TezTaskConfig.TEZ_TASK_CONFIG, EncodingUtils.encodeObjectToString(taskConfig));
+
+ ProcessorDescriptor descriptor = ProcessorDescriptor.create(
+ RegularProcessor.class.getName());
+
+ descriptor.setUserPayload(TezUtils.createUserPayloadFromConf(conf));
+
+ cached = Vertex.create(this.getUniqueName(), descriptor, getParallelism());
+
+ return cached;
+ } catch (IOException e) {
+ throw new CompilerException(
+ "An error occurred while creating a Tez Vertex: " + e.getMessage(), e);
+ }
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/2cbbd328/flink-staging/flink-tez/src/main/java/org/apache/flink/tez/dag/FlinkUnionVertex.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-tez/src/main/java/org/apache/flink/tez/dag/FlinkUnionVertex.java b/flink-staging/flink-tez/src/main/java/org/apache/flink/tez/dag/FlinkUnionVertex.java
new file mode 100644
index 0000000..0cf9990
--- /dev/null
+++ b/flink-staging/flink-tez/src/main/java/org/apache/flink/tez/dag/FlinkUnionVertex.java
@@ -0,0 +1,61 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.tez.dag;
+
+
+import org.apache.flink.optimizer.CompilerException;
+import org.apache.flink.tez.runtime.UnionProcessor;
+import org.apache.flink.tez.runtime.TezTaskConfig;
+import org.apache.flink.tez.util.EncodingUtils;
+import org.apache.tez.common.TezUtils;
+import org.apache.tez.dag.api.ProcessorDescriptor;
+import org.apache.tez.dag.api.TezConfiguration;
+import org.apache.tez.dag.api.Vertex;
+
+import java.io.IOException;
+
+public class FlinkUnionVertex extends FlinkVertex {
+
+ public FlinkUnionVertex(String taskName, int parallelism, TezTaskConfig taskConfig) {
+ super(taskName, parallelism, taskConfig);
+ }
+
+ @Override
+ public Vertex createVertex(TezConfiguration conf) {
+ try {
+ this.writeInputPositionsToConfig();
+ this.writeSubTasksInOutputToConfig();
+
+ conf.set(TezTaskConfig.TEZ_TASK_CONFIG, EncodingUtils.encodeObjectToString(taskConfig));
+
+ ProcessorDescriptor descriptor = ProcessorDescriptor.create(
+ UnionProcessor.class.getName());
+
+ descriptor.setUserPayload(TezUtils.createUserPayloadFromConf(conf));
+
+ cached = Vertex.create(this.getUniqueName(), descriptor, getParallelism());
+
+ return cached;
+ }
+ catch (IOException e) {
+ throw new CompilerException(
+ "An error occurred while creating a Tez Vertex: " + e.getMessage(), e);
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/2cbbd328/flink-staging/flink-tez/src/main/java/org/apache/flink/tez/dag/FlinkVertex.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-tez/src/main/java/org/apache/flink/tez/dag/FlinkVertex.java b/flink-staging/flink-tez/src/main/java/org/apache/flink/tez/dag/FlinkVertex.java
new file mode 100644
index 0000000..883acc6
--- /dev/null
+++ b/flink-staging/flink-tez/src/main/java/org/apache/flink/tez/dag/FlinkVertex.java
@@ -0,0 +1,114 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.tez.dag;
+
+
+import org.apache.flink.tez.runtime.TezTaskConfig;
+import org.apache.tez.dag.api.TezConfiguration;
+import org.apache.tez.dag.api.Vertex;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.UUID;
+
+public abstract class FlinkVertex {
+
+ protected Vertex cached;
+ private String taskName;
+ private int parallelism;
+ protected TezTaskConfig taskConfig;
+
+ // Tez-specific bookkeeping
+ protected String uniqueName; //Unique name in DAG
+ private Map<FlinkVertex,ArrayList<Integer>> inputPositions;
+ private ArrayList<Integer> numberOfSubTasksInOutputs;
+
+ public TezTaskConfig getConfig() {
+ return taskConfig;
+ }
+
+ public FlinkVertex(String taskName, int parallelism, TezTaskConfig taskConfig) {
+ this.cached = null;
+ this.taskName = taskName;
+ this.parallelism = parallelism;
+ this.taskConfig = taskConfig;
+ this.uniqueName = taskName + UUID.randomUUID().toString();
+ this.inputPositions = new HashMap<FlinkVertex, ArrayList<Integer>>();
+ this.numberOfSubTasksInOutputs = new ArrayList<Integer>();
+ }
+
+ public int getParallelism () {
+ return parallelism;
+ }
+
+ public void setParallelism (int parallelism) {
+ this.parallelism = parallelism;
+ }
+
+ public abstract Vertex createVertex (TezConfiguration conf);
+
+ public Vertex getVertex () {
+ return cached;
+ }
+
+ protected String getUniqueName () {
+ return uniqueName;
+ }
+
+ public void addInput (FlinkVertex vertex, int position) {
+ if (inputPositions.containsKey(vertex)) {
+ inputPositions.get(vertex).add(position);
+ }
+ else {
+ ArrayList<Integer> lst = new ArrayList<Integer>();
+ lst.add(position);
+ inputPositions.put(vertex,lst);
+ }
+ }
+
+ public void addNumberOfSubTasksInOutput (int subTasks, int position) {
+ if (numberOfSubTasksInOutputs.isEmpty()) {
+ numberOfSubTasksInOutputs.add(-1);
+ }
+ int currSize = numberOfSubTasksInOutputs.size();
+ for (int i = currSize; i <= position; i++) {
+ numberOfSubTasksInOutputs.add(i, -1);
+ }
+ numberOfSubTasksInOutputs.set(position, subTasks);
+ }
+
+ // Must be called before taskConfig is written to Tez configuration
+ protected void writeInputPositionsToConfig () {
+ HashMap<String,ArrayList<Integer>> toWrite = new HashMap<String, ArrayList<Integer>>();
+ for (FlinkVertex v: inputPositions.keySet()) {
+ String name = v.getUniqueName();
+ List<Integer> positions = inputPositions.get(v);
+ toWrite.put(name, new ArrayList<Integer>(positions));
+ }
+ this.taskConfig.setInputPositions(toWrite);
+ }
+
+ // Must be called before taskConfig is written to Tez configuration
+ protected void writeSubTasksInOutputToConfig () {
+ this.taskConfig.setNumberSubtasksInOutput(this.numberOfSubTasksInOutputs);
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/2cbbd328/flink-staging/flink-tez/src/main/java/org/apache/flink/tez/dag/TezDAGGenerator.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-tez/src/main/java/org/apache/flink/tez/dag/TezDAGGenerator.java b/flink-staging/flink-tez/src/main/java/org/apache/flink/tez/dag/TezDAGGenerator.java
new file mode 100644
index 0000000..52f39be
--- /dev/null
+++ b/flink-staging/flink-tez/src/main/java/org/apache/flink/tez/dag/TezDAGGenerator.java
@@ -0,0 +1,460 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.tez.dag;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.flink.api.common.distributions.DataDistribution;
+import org.apache.flink.api.common.io.InputFormat;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.api.common.typeutils.TypeSerializerFactory;
+import org.apache.flink.configuration.ConfigConstants;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.core.io.InputSplit;
+import org.apache.flink.optimizer.CompilerException;
+import org.apache.flink.optimizer.dag.TempMode;
+import org.apache.flink.optimizer.plan.BulkIterationPlanNode;
+import org.apache.flink.optimizer.plan.Channel;
+import org.apache.flink.optimizer.plan.DualInputPlanNode;
+import org.apache.flink.optimizer.plan.NAryUnionPlanNode;
+import org.apache.flink.optimizer.plan.OptimizedPlan;
+import org.apache.flink.optimizer.plan.PlanNode;
+import org.apache.flink.optimizer.plan.SingleInputPlanNode;
+import org.apache.flink.optimizer.plan.SinkPlanNode;
+import org.apache.flink.optimizer.plan.SourcePlanNode;
+import org.apache.flink.optimizer.plan.WorksetIterationPlanNode;
+import org.apache.flink.runtime.operators.DriverStrategy;
+import org.apache.flink.runtime.operators.shipping.ShipStrategyType;
+import org.apache.flink.runtime.operators.util.LocalStrategy;
+import org.apache.flink.runtime.operators.util.TaskConfig;
+import org.apache.flink.tez.runtime.TezTaskConfig;
+import org.apache.flink.util.Visitor;
+import org.apache.tez.dag.api.DAG;
+import org.apache.tez.dag.api.TezConfiguration;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+
+
+public class TezDAGGenerator implements Visitor<PlanNode> {
+
+ private static final Log LOG = LogFactory.getLog(TezDAGGenerator.class);
+
+ private Map<PlanNode, FlinkVertex> vertices; // a map from optimizer nodes to Tez vertices
+ private List<FlinkEdge> edges;
+ private final int defaultMaxFan;
+ private final TezConfiguration tezConf;
+
+ private final float defaultSortSpillingThreshold;
+
+ public TezDAGGenerator (TezConfiguration tezConf, Configuration config) {
+ this.defaultMaxFan = config.getInteger(ConfigConstants.DEFAULT_SPILLING_MAX_FAN_KEY,
+ ConfigConstants.DEFAULT_SPILLING_MAX_FAN);
+ this.defaultSortSpillingThreshold = config.getFloat(ConfigConstants.DEFAULT_SORT_SPILLING_THRESHOLD_KEY,
+ ConfigConstants.DEFAULT_SORT_SPILLING_THRESHOLD);
+ this.tezConf = tezConf;
+ }
+
+ public DAG createDAG (OptimizedPlan program) throws Exception {
+ LOG.info ("Creating Tez DAG");
+ this.vertices = new HashMap<PlanNode, FlinkVertex>();
+ this.edges = new ArrayList<FlinkEdge>();
+ program.accept(this);
+
+ DAG dag = DAG.create(program.getJobName());
+ for (FlinkVertex v : vertices.values()) {
+ dag.addVertex(v.createVertex(new TezConfiguration(tezConf)));
+ }
+ for (FlinkEdge e: edges) {
+ dag.addEdge(e.createEdge(new TezConfiguration(tezConf)));
+ }
+
+ /*
+ * Temporarily throw an error until TEZ-1190 has been fixed or a workaround has been created
+ */
+ if (containsSelfJoins()) {
+ throw new CompilerException("Dual-input operators with the same input (self-joins) are not yet supported");
+ }
+
+ this.vertices = null;
+ this.edges = null;
+
+ LOG.info ("Tez DAG created");
+ return dag;
+ }
+
+
+ @Override
+ public boolean preVisit(PlanNode node) {
+ if (this.vertices.containsKey(node)) {
+ // return false to prevent further descend
+ return false;
+ }
+
+ if ((node instanceof BulkIterationPlanNode) || (node instanceof WorksetIterationPlanNode)) {
+ throw new CompilerException("Iterations are not yet supported by the Tez execution environment");
+ }
+
+ if ( (node.getBroadcastInputs() != null) && (!node.getBroadcastInputs().isEmpty())) {
+ throw new CompilerException("Broadcast inputs are not yet supported by the Tez execution environment");
+ }
+
+ FlinkVertex vertex = null;
+
+ try {
+ if (node instanceof SourcePlanNode) {
+ vertex = createDataSourceVertex ((SourcePlanNode) node);
+ }
+ else if (node instanceof SinkPlanNode) {
+ vertex = createDataSinkVertex ((SinkPlanNode) node);
+ }
+ else if ((node instanceof SingleInputPlanNode)) {
+ vertex = createSingleInputVertex((SingleInputPlanNode) node);
+ }
+ else if (node instanceof DualInputPlanNode) {
+ vertex = createDualInputVertex((DualInputPlanNode) node);
+ }
+ else if (node instanceof NAryUnionPlanNode) {
+ vertex = createUnionVertex ((NAryUnionPlanNode) node);
+ }
+ else {
+ throw new CompilerException("Unrecognized node type: " + node.getClass().getName());
+ }
+
+ }
+ catch (Exception e) {
+ throw new CompilerException("Error translating node '" + node + "': " + e.getMessage(), e);
+ }
+
+ if (vertex != null) {
+ this.vertices.put(node, vertex);
+ }
+ return true;
+ }
+
+ @Override
+ public void postVisit (PlanNode node) {
+ try {
+ if (node instanceof SourcePlanNode) {
+ return;
+ }
+ final Iterator<Channel> inConns = node.getInputs().iterator();
+ if (!inConns.hasNext()) {
+ throw new CompilerException("Bug: Found a non-source task with no input.");
+ }
+ int inputIndex = 0;
+
+ FlinkVertex targetVertex = this.vertices.get(node);
+ TezTaskConfig targetVertexConfig = targetVertex.getConfig();
+
+
+ while (inConns.hasNext()) {
+ Channel input = inConns.next();
+ inputIndex += translateChannel(input, inputIndex, targetVertex, targetVertexConfig, false);
+ }
+ }
+ catch (Exception e) {
+ e.printStackTrace();
+ throw new CompilerException(
+ "An error occurred while translating the optimized plan to a Tez DAG: " + e.getMessage(), e);
+ }
+ }
+
+ private FlinkVertex createSingleInputVertex(SingleInputPlanNode node) throws CompilerException, IOException {
+
+ final String taskName = node.getNodeName();
+ final DriverStrategy ds = node.getDriverStrategy();
+ final int dop = node.getParallelism();
+
+ final TezTaskConfig config= new TezTaskConfig(new Configuration());
+
+ config.setDriver(ds.getDriverClass());
+ config.setDriverStrategy(ds);
+ config.setStubWrapper(node.getProgramOperator().getUserCodeWrapper());
+ config.setStubParameters(node.getProgramOperator().getParameters());
+
+ for(int i=0;i<ds.getNumRequiredComparators();i++) {
+ config.setDriverComparator(node.getComparator(i), i);
+ }
+ assignDriverResources(node, config);
+
+ return new FlinkProcessorVertex(taskName, dop, config);
+ }
+
+ private FlinkVertex createDualInputVertex(DualInputPlanNode node) throws CompilerException, IOException {
+ final String taskName = node.getNodeName();
+ final DriverStrategy ds = node.getDriverStrategy();
+ final int dop = node.getParallelism();
+
+ final TezTaskConfig config= new TezTaskConfig(new Configuration());
+
+ config.setDriver(ds.getDriverClass());
+ config.setDriverStrategy(ds);
+ config.setStubWrapper(node.getProgramOperator().getUserCodeWrapper());
+ config.setStubParameters(node.getProgramOperator().getParameters());
+
+ if (node.getComparator1() != null) {
+ config.setDriverComparator(node.getComparator1(), 0);
+ }
+ if (node.getComparator2() != null) {
+ config.setDriverComparator(node.getComparator2(), 1);
+ }
+ if (node.getPairComparator() != null) {
+ config.setDriverPairComparator(node.getPairComparator());
+ }
+
+ assignDriverResources(node, config);
+
+ LOG.info("Creating processor vertex " + taskName + " with parallelism " + dop);
+
+ return new FlinkProcessorVertex(taskName, dop, config);
+ }
+
+ private FlinkVertex createDataSinkVertex(SinkPlanNode node) throws CompilerException, IOException {
+ final String taskName = node.getNodeName();
+ final int dop = node.getParallelism();
+
+ final TezTaskConfig config = new TezTaskConfig(new Configuration());
+
+ // set user code
+ config.setStubWrapper(node.getProgramOperator().getUserCodeWrapper());
+ config.setStubParameters(node.getProgramOperator().getParameters());
+
+ LOG.info("Creating data sink vertex " + taskName + " with parallelism " + dop);
+
+ return new FlinkDataSinkVertex(taskName, dop, config);
+ }
+
+ private FlinkVertex createDataSourceVertex(SourcePlanNode node) throws CompilerException, IOException {
+ final String taskName = node.getNodeName();
+ int dop = node.getParallelism();
+
+ final TezTaskConfig config= new TezTaskConfig(new Configuration());
+
+ config.setStubWrapper(node.getProgramOperator().getUserCodeWrapper());
+ config.setStubParameters(node.getProgramOperator().getParameters());
+
+ InputFormat format = node.getDataSourceNode().getOperator().getFormatWrapper().getUserCodeObject();
+
+ config.setInputFormat(format);
+
+ // Create as many data sources as input splits
+ InputSplit[] splits = format.createInputSplits((dop > 0) ? dop : 1);
+ dop = splits.length;
+
+ LOG.info("Creating data source vertex " + taskName + " with parallelism " + dop);
+
+ return new FlinkDataSourceVertex(taskName, dop, config);
+ }
+
+ private FlinkVertex createUnionVertex(NAryUnionPlanNode node) throws CompilerException, IOException {
+ final String taskName = node.getNodeName();
+ final int dop = node.getParallelism();
+ final TezTaskConfig config= new TezTaskConfig(new Configuration());
+
+ LOG.info("Creating union vertex " + taskName + " with parallelism " + dop);
+
+ return new FlinkUnionVertex (taskName, dop, config);
+ }
+
+
+ private void assignDriverResources(PlanNode node, TaskConfig config) {
+ final double relativeMem = node.getRelativeMemoryPerSubTask();
+ if (relativeMem > 0) {
+ config.setRelativeMemoryDriver(relativeMem);
+ config.setFilehandlesDriver(this.defaultMaxFan);
+ config.setSpillingThresholdDriver(this.defaultSortSpillingThreshold);
+ }
+ }
+
+ private void assignLocalStrategyResources(Channel c, TaskConfig config, int inputNum) {
+ if (c.getRelativeMemoryLocalStrategy() > 0) {
+ config.setRelativeMemoryInput(inputNum, c.getRelativeMemoryLocalStrategy());
+ config.setFilehandlesInput(inputNum, this.defaultMaxFan);
+ config.setSpillingThresholdInput(inputNum, this.defaultSortSpillingThreshold);
+ }
+ }
+
+ private int translateChannel(Channel input, int inputIndex, FlinkVertex targetVertex,
+ TezTaskConfig targetVertexConfig, boolean isBroadcast) throws Exception
+ {
+ final PlanNode inputPlanNode = input.getSource();
+ final Iterator<Channel> allInChannels;
+
+
+ allInChannels = Collections.singletonList(input).iterator();
+
+
+ // check that the type serializer is consistent
+ TypeSerializerFactory<?> typeSerFact = null;
+
+ while (allInChannels.hasNext()) {
+ final Channel inConn = allInChannels.next();
+
+ if (typeSerFact == null) {
+ typeSerFact = inConn.getSerializer();
+ } else if (!typeSerFact.equals(inConn.getSerializer())) {
+ throw new CompilerException("Conflicting types in union operator.");
+ }
+
+ final PlanNode sourceNode = inConn.getSource();
+ FlinkVertex sourceVertex = this.vertices.get(sourceNode);
+ TezTaskConfig sourceVertexConfig = sourceVertex.getConfig(); //TODO ??? need to create a new TezConfig ???
+
+ connectJobVertices(
+ inConn, inputIndex, sourceVertex, sourceVertexConfig, targetVertex, targetVertexConfig, isBroadcast);
+ }
+
+ // the local strategy is added only once. in non-union case that is the actual edge,
+ // in the union case, it is the edge between union and the target node
+ addLocalInfoFromChannelToConfig(input, targetVertexConfig, inputIndex, isBroadcast);
+ return 1;
+ }
+
+ private void connectJobVertices(Channel channel, int inputNumber,
+ final FlinkVertex sourceVertex, final TezTaskConfig sourceConfig,
+ final FlinkVertex targetVertex, final TezTaskConfig targetConfig, boolean isBroadcast)
+ throws CompilerException {
+
+ // -------------- configure the source task's ship strategy strategies in task config --------------
+ final int outputIndex = sourceConfig.getNumOutputs();
+ sourceConfig.addOutputShipStrategy(channel.getShipStrategy());
+ if (outputIndex == 0) {
+ sourceConfig.setOutputSerializer(channel.getSerializer());
+ }
+ if (channel.getShipStrategyComparator() != null) {
+ sourceConfig.setOutputComparator(channel.getShipStrategyComparator(), outputIndex);
+ }
+
+ if (channel.getShipStrategy() == ShipStrategyType.PARTITION_RANGE) {
+
+ final DataDistribution dataDistribution = channel.getDataDistribution();
+ if(dataDistribution != null) {
+ sourceConfig.setOutputDataDistribution(dataDistribution, outputIndex);
+ } else {
+ throw new RuntimeException("Range partitioning requires data distribution");
+ // TODO: inject code and configuration for automatic histogram generation
+ }
+ }
+
+ // ---------------- configure the receiver -------------------
+ if (isBroadcast) {
+ targetConfig.addBroadcastInputToGroup(inputNumber);
+ } else {
+ targetConfig.addInputToGroup(inputNumber);
+ }
+
+ //----------------- connect source and target with edge ------------------------------
+
+ FlinkEdge edge;
+ ShipStrategyType shipStrategy = channel.getShipStrategy();
+ TypeSerializer<?> serializer = channel.getSerializer().getSerializer();
+ if ((shipStrategy == ShipStrategyType.FORWARD) || (shipStrategy == ShipStrategyType.NONE)) {
+ edge = new FlinkForwardEdge(sourceVertex, targetVertex, serializer);
+ // For forward edges, create as many tasks in upstream operator as in source operator
+ targetVertex.setParallelism(sourceVertex.getParallelism());
+ }
+ else if (shipStrategy == ShipStrategyType.BROADCAST) {
+ edge = new FlinkBroadcastEdge(sourceVertex, targetVertex, serializer);
+ }
+ else if (shipStrategy == ShipStrategyType.PARTITION_HASH) {
+ edge = new FlinkPartitionEdge(sourceVertex, targetVertex, serializer);
+ }
+ else {
+ throw new CompilerException("Ship strategy between nodes " + sourceVertex.getVertex().getName() + " and " + targetVertex.getVertex().getName() + " currently not supported");
+ }
+
+ // Tez-specific bookkeeping
+ // TODO: This probably will not work for vertices with multiple outputs
+ sourceVertex.addNumberOfSubTasksInOutput(targetVertex.getParallelism(), outputIndex);
+ targetVertex.addInput(sourceVertex, inputNumber);
+
+
+ edges.add(edge);
+ }
+
+ private void addLocalInfoFromChannelToConfig(Channel channel, TaskConfig config, int inputNum, boolean isBroadcastChannel) {
+ // serializer
+ if (isBroadcastChannel) {
+ config.setBroadcastInputSerializer(channel.getSerializer(), inputNum);
+
+ if (channel.getLocalStrategy() != LocalStrategy.NONE || (channel.getTempMode() != null && channel.getTempMode() != TempMode.NONE)) {
+ throw new CompilerException("Found local strategy or temp mode on a broadcast variable channel.");
+ } else {
+ return;
+ }
+ } else {
+ config.setInputSerializer(channel.getSerializer(), inputNum);
+ }
+
+ // local strategy
+ if (channel.getLocalStrategy() != LocalStrategy.NONE) {
+ config.setInputLocalStrategy(inputNum, channel.getLocalStrategy());
+ if (channel.getLocalStrategyComparator() != null) {
+ config.setInputComparator(channel.getLocalStrategyComparator(), inputNum);
+ }
+ }
+
+ assignLocalStrategyResources(channel, config, inputNum);
+
+ // materialization / caching
+ if (channel.getTempMode() != null) {
+ final TempMode tm = channel.getTempMode();
+
+ boolean needsMemory = false;
+ if (tm.breaksPipeline()) {
+ config.setInputAsynchronouslyMaterialized(inputNum, true);
+ needsMemory = true;
+ }
+ if (tm.isCached()) {
+ config.setInputCached(inputNum, true);
+ needsMemory = true;
+ }
+
+ if (needsMemory) {
+ // sanity check
+ if (tm == null || tm == TempMode.NONE || channel.getRelativeTempMemory() <= 0) {
+ throw new CompilerException("Bug in compiler: Inconsistent description of input materialization.");
+ }
+ config.setRelativeInputMaterializationMemory(inputNum, channel.getRelativeTempMemory());
+ }
+ }
+ }
+
+ private boolean containsSelfJoins () {
+ for (FlinkVertex v : vertices.values()) {
+ ArrayList<FlinkVertex> predecessors = new ArrayList<FlinkVertex>();
+ for (FlinkEdge e : edges) {
+ if (e.target == v) {
+ if (predecessors.contains(e.source)) {
+ return true;
+ }
+ predecessors.add(e.source);
+ }
+ }
+ }
+ return false;
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/2cbbd328/flink-staging/flink-tez/src/main/java/org/apache/flink/tez/examples/ConnectedComponentsStep.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-tez/src/main/java/org/apache/flink/tez/examples/ConnectedComponentsStep.java b/flink-staging/flink-tez/src/main/java/org/apache/flink/tez/examples/ConnectedComponentsStep.java
new file mode 100644
index 0000000..707fd47
--- /dev/null
+++ b/flink-staging/flink-tez/src/main/java/org/apache/flink/tez/examples/ConnectedComponentsStep.java
@@ -0,0 +1,203 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.tez.examples;
+
+import org.apache.flink.api.common.ProgramDescription;
+import org.apache.flink.api.common.functions.FlatJoinFunction;
+import org.apache.flink.api.common.functions.FlatMapFunction;
+import org.apache.flink.api.common.functions.JoinFunction;
+import org.apache.flink.api.common.functions.MapFunction;
+import org.apache.flink.api.java.DataSet;
+import org.apache.flink.api.java.ExecutionEnvironment;
+import org.apache.flink.api.java.aggregation.Aggregations;
+import org.apache.flink.api.java.functions.FunctionAnnotation.ForwardedFields;
+import org.apache.flink.api.java.functions.FunctionAnnotation.ForwardedFieldsFirst;
+import org.apache.flink.api.java.functions.FunctionAnnotation.ForwardedFieldsSecond;
+import org.apache.flink.api.java.tuple.Tuple1;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.examples.java.graph.util.ConnectedComponentsData;
+import org.apache.flink.util.Collector;
+
+
+public class ConnectedComponentsStep implements ProgramDescription {
+
+ // *************************************************************************
+ // PROGRAM
+ // *************************************************************************
+
+ public static void main(String... args) throws Exception {
+
+ if(!parseParameters(args)) {
+ return;
+ }
+
+ // set up execution environment
+ ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+
+ // read vertex and edge data
+ DataSet<Long> vertices = getVertexDataSet(env);
+ DataSet<Tuple2<Long, Long>> edges = getEdgeDataSet(env).flatMap(new UndirectEdge());
+
+ // assign the initial components (equal to the vertex id)
+ DataSet<Tuple2<Long, Long>> verticesWithInitialId = vertices.map(new DuplicateValue<Long>());
+
+ DataSet<Tuple2<Long,Long>> nextComponenets = verticesWithInitialId
+ .join(edges)
+ .where(0).equalTo(0)
+ .with(new NeighborWithComponentIDJoin())
+ .groupBy(0).aggregate(Aggregations.MIN, 1)
+ .join(verticesWithInitialId)
+ .where(0).equalTo(0)
+ .with(new ComponentIdFilter());
+
+
+ // emit result
+ if(fileOutput) {
+ nextComponenets.writeAsCsv(outputPath, "\n", " ");
+ } else {
+ nextComponenets.print();
+ }
+
+ // execute program
+ env.execute("Connected Components Example");
+ }
+
+ // *************************************************************************
+ // USER FUNCTIONS
+ // *************************************************************************
+
+ /**
+ * Function that turns a value into a 2-tuple where both fields are that value.
+ */
+ @ForwardedFields("*->f0")
+ public static final class DuplicateValue<T> implements MapFunction<T, Tuple2<T, T>> {
+
+ @Override
+ public Tuple2<T, T> map(T vertex) {
+ return new Tuple2<T, T>(vertex, vertex);
+ }
+ }
+
+ /**
+ * Undirected edges by emitting for each input edge the input edges itself and an inverted version.
+ */
+ public static final class UndirectEdge implements FlatMapFunction<Tuple2<Long, Long>, Tuple2<Long, Long>> {
+ Tuple2<Long, Long> invertedEdge = new Tuple2<Long, Long>();
+
+ @Override
+ public void flatMap(Tuple2<Long, Long> edge, Collector<Tuple2<Long, Long>> out) {
+ invertedEdge.f0 = edge.f1;
+ invertedEdge.f1 = edge.f0;
+ out.collect(edge);
+ out.collect(invertedEdge);
+ }
+ }
+
+ /**
+ * UDF that joins a (Vertex-ID, Component-ID) pair that represents the current component that
+ * a vertex is associated with, with a (Source-Vertex-ID, Target-VertexID) edge. The function
+ * produces a (Target-vertex-ID, Component-ID) pair.
+ */
+ @ForwardedFieldsFirst("f1->f1")
+ @ForwardedFieldsSecond("f1->f0")
+ public static final class NeighborWithComponentIDJoin implements JoinFunction<Tuple2<Long, Long>, Tuple2<Long, Long>, Tuple2<Long, Long>> {
+
+ @Override
+ public Tuple2<Long, Long> join(Tuple2<Long, Long> vertexWithComponent, Tuple2<Long, Long> edge) {
+ return new Tuple2<Long, Long>(edge.f1, vertexWithComponent.f1);
+ }
+ }
+
+
+
+ @ForwardedFieldsFirst("*")
+ public static final class ComponentIdFilter implements FlatJoinFunction<Tuple2<Long, Long>, Tuple2<Long, Long>, Tuple2<Long, Long>> {
+
+ @Override
+ public void join(Tuple2<Long, Long> candidate, Tuple2<Long, Long> old, Collector<Tuple2<Long, Long>> out) {
+ if (candidate.f1 < old.f1) {
+ out.collect(candidate);
+ }
+ }
+ }
+
+
+
+ @Override
+ public String getDescription() {
+ return "Parameters: <vertices-path> <edges-path> <result-path> <max-number-of-iterations>";
+ }
+
+ // *************************************************************************
+ // UTIL METHODS
+ // *************************************************************************
+
+ private static boolean fileOutput = false;
+ private static String verticesPath = null;
+ private static String edgesPath = null;
+ private static String outputPath = null;
+ private static int maxIterations = 10;
+
+ private static boolean parseParameters(String[] programArguments) {
+
+ if(programArguments.length > 0) {
+ // parse input arguments
+ fileOutput = true;
+ if(programArguments.length == 4) {
+ verticesPath = programArguments[0];
+ edgesPath = programArguments[1];
+ outputPath = programArguments[2];
+ maxIterations = Integer.parseInt(programArguments[3]);
+ } else {
+ System.err.println("Usage: ConnectedComponents <vertices path> <edges path> <result path> <max number of iterations>");
+ return false;
+ }
+ } else {
+ System.out.println("Executing Connected Components example with default parameters and built-in default data.");
+ System.out.println(" Provide parameters to read input data from files.");
+ System.out.println(" See the documentation for the correct format of input files.");
+ System.out.println(" Usage: ConnectedComponents <vertices path> <edges path> <result path> <max number of iterations>");
+ }
+ return true;
+ }
+
+ private static DataSet<Long> getVertexDataSet(ExecutionEnvironment env) {
+
+ if(fileOutput) {
+ return env.readCsvFile(verticesPath).types(Long.class)
+ .map(
+ new MapFunction<Tuple1<Long>, Long>() {
+ public Long map(Tuple1<Long> value) { return value.f0; }
+ });
+ } else {
+ return ConnectedComponentsData.getDefaultVertexDataSet(env);
+ }
+ }
+
+ private static DataSet<Tuple2<Long, Long>> getEdgeDataSet(ExecutionEnvironment env) {
+
+ if(fileOutput) {
+ return env.readCsvFile(edgesPath).fieldDelimiter(' ').types(Long.class, Long.class);
+ } else {
+ return ConnectedComponentsData.getDefaultEdgeDataSet(env);
+ }
+ }
+
+
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/2cbbd328/flink-staging/flink-tez/src/main/java/org/apache/flink/tez/examples/ExampleDriver.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-tez/src/main/java/org/apache/flink/tez/examples/ExampleDriver.java b/flink-staging/flink-tez/src/main/java/org/apache/flink/tez/examples/ExampleDriver.java
new file mode 100644
index 0000000..c65fb69
--- /dev/null
+++ b/flink-staging/flink-tez/src/main/java/org/apache/flink/tez/examples/ExampleDriver.java
@@ -0,0 +1,119 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.tez.examples;
+
+import org.apache.hadoop.util.ProgramDriver;
+import org.apache.tez.common.counters.TezCounters;
+import org.apache.tez.dag.api.TezException;
+import org.apache.tez.dag.api.client.DAGClient;
+import org.apache.tez.dag.api.client.DAGStatus;
+import org.apache.tez.dag.api.client.Progress;
+import org.apache.tez.dag.api.client.StatusGetOpts;
+import org.apache.tez.dag.api.client.VertexStatus;
+
+import java.io.IOException;
+import java.text.DecimalFormat;
+import java.util.EnumSet;
+import java.util.Set;
+
+public class ExampleDriver {
+
+ private static final DecimalFormat formatter = new DecimalFormat("###.##%");
+
+ public static void main(String [] args){
+ int exitCode = -1;
+ ProgramDriver pgd = new ProgramDriver();
+ try {
+ pgd.addClass("wc", WordCount.class,
+ "Wordcount");
+ pgd.addClass("tpch3", TPCHQuery3.class,
+ "Modified TPC-H 3 query");
+ pgd.addClass("tc", TransitiveClosureNaiveStep.class,
+ "One step of transitive closure");
+ pgd.addClass("pr", PageRankBasicStep.class,
+ "One step of PageRank");
+ pgd.addClass("cc", ConnectedComponentsStep.class,
+ "One step of connected components");
+ exitCode = pgd.run(args);
+ } catch(Throwable e){
+ e.printStackTrace();
+ }
+ System.exit(exitCode);
+ }
+
+ public static void printDAGStatus(DAGClient dagClient, String[] vertexNames)
+ throws IOException, TezException {
+ printDAGStatus(dagClient, vertexNames, false, false);
+ }
+
+ public static void printDAGStatus(DAGClient dagClient, String[] vertexNames, boolean displayDAGCounters, boolean displayVertexCounters)
+ throws IOException, TezException {
+ Set<StatusGetOpts> opts = EnumSet.of(StatusGetOpts.GET_COUNTERS);
+ DAGStatus dagStatus = dagClient.getDAGStatus(
+ (displayDAGCounters ? opts : null));
+ Progress progress = dagStatus.getDAGProgress();
+ double vProgressFloat = 0.0f;
+ if (progress != null) {
+ System.out.println("");
+ System.out.println("DAG: State: "
+ + dagStatus.getState()
+ + " Progress: "
+ + (progress.getTotalTaskCount() < 0 ? formatter.format(0.0f) :
+ formatter.format((double)(progress.getSucceededTaskCount())
+ /progress.getTotalTaskCount())));
+ for (String vertexName : vertexNames) {
+ VertexStatus vStatus = dagClient.getVertexStatus(vertexName,
+ (displayVertexCounters ? opts : null));
+ if (vStatus == null) {
+ System.out.println("Could not retrieve status for vertex: "
+ + vertexName);
+ continue;
+ }
+ Progress vProgress = vStatus.getProgress();
+ if (vProgress != null) {
+ vProgressFloat = 0.0f;
+ if (vProgress.getTotalTaskCount() == 0) {
+ vProgressFloat = 1.0f;
+ } else if (vProgress.getTotalTaskCount() > 0) {
+ vProgressFloat = (double)vProgress.getSucceededTaskCount()
+ /vProgress.getTotalTaskCount();
+ }
+ System.out.println("VertexStatus:"
+ + " VertexName: "
+ + (vertexName.equals("ivertex1") ? "intermediate-reducer"
+ : vertexName)
+ + " Progress: " + formatter.format(vProgressFloat));
+ }
+ if (displayVertexCounters) {
+ TezCounters counters = vStatus.getVertexCounters();
+ if (counters != null) {
+ System.out.println("Vertex Counters for " + vertexName + ": "
+ + counters);
+ }
+ }
+ }
+ }
+ if (displayDAGCounters) {
+ TezCounters counters = dagStatus.getDAGCounters();
+ if (counters != null) {
+ System.out.println("DAG Counters: " + counters);
+ }
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/2cbbd328/flink-staging/flink-tez/src/main/java/org/apache/flink/tez/examples/PageRankBasicStep.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-tez/src/main/java/org/apache/flink/tez/examples/PageRankBasicStep.java b/flink-staging/flink-tez/src/main/java/org/apache/flink/tez/examples/PageRankBasicStep.java
new file mode 100644
index 0000000..031893d
--- /dev/null
+++ b/flink-staging/flink-tez/src/main/java/org/apache/flink/tez/examples/PageRankBasicStep.java
@@ -0,0 +1,241 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.tez.examples;
+
+
+import org.apache.flink.api.common.functions.FilterFunction;
+import org.apache.flink.api.common.functions.FlatMapFunction;
+import org.apache.flink.api.common.functions.GroupReduceFunction;
+import org.apache.flink.api.common.functions.MapFunction;
+import org.apache.flink.api.java.DataSet;
+import org.apache.flink.api.java.ExecutionEnvironment;
+import org.apache.flink.api.java.functions.FunctionAnnotation.ForwardedFields;
+import org.apache.flink.api.java.tuple.Tuple1;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.examples.java.graph.util.PageRankData;
+import org.apache.flink.util.Collector;
+
+import java.util.ArrayList;
+
+import static org.apache.flink.api.java.aggregation.Aggregations.SUM;
+
+public class PageRankBasicStep {
+
+ private static final double DAMPENING_FACTOR = 0.85;
+ private static final double EPSILON = 0.0001;
+
+ // *************************************************************************
+ // PROGRAM
+ // *************************************************************************
+
+ public static void main(String[] args) throws Exception {
+
+ if(!parseParameters(args)) {
+ return;
+ }
+
+ // set up execution environment
+ final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+
+ // get input data
+ DataSet<Long> pagesInput = getPagesDataSet(env);
+ DataSet<Tuple2<Long, Long>> linksInput = getLinksDataSet(env);
+
+ // assign initial rank to pages
+ DataSet<Tuple2<Long, Double>> pagesWithRanks = pagesInput.
+ map(new RankAssigner((1.0d / numPages)));
+
+ // build adjacency list from link input
+ DataSet<Tuple2<Long, Long[]>> adjacencyListInput =
+ linksInput.groupBy(0).reduceGroup(new BuildOutgoingEdgeList());
+
+ DataSet<Tuple2<Long, Double>> newRanks = pagesWithRanks
+ .join(adjacencyListInput).where(0).equalTo(0)
+ .flatMap(new JoinVertexWithEdgesMatch())
+ .groupBy(0).aggregate(SUM, 1)
+ .map(new Dampener(DAMPENING_FACTOR, numPages));
+
+
+ // emit result
+ if(fileOutput) {
+ newRanks.writeAsCsv(outputPath, "\n", " ");
+ } else {
+ newRanks.print();
+ }
+
+ // execute program
+ env.execute("Basic Page Rank Example");
+
+ }
+
+ // *************************************************************************
+ // USER FUNCTIONS
+ // *************************************************************************
+
+ /**
+ * A map function that assigns an initial rank to all pages.
+ */
+ public static final class RankAssigner implements MapFunction<Long, Tuple2<Long, Double>> {
+ Tuple2<Long, Double> outPageWithRank;
+
+ public RankAssigner(double rank) {
+ this.outPageWithRank = new Tuple2<Long, Double>(-1l, rank);
+ }
+
+ @Override
+ public Tuple2<Long, Double> map(Long page) {
+ outPageWithRank.f0 = page;
+ return outPageWithRank;
+ }
+ }
+
+ /**
+ * A reduce function that takes a sequence of edges and builds the adjacency list for the vertex where the edges
+ * originate. Run as a pre-processing step.
+ */
+ @ForwardedFields("0")
+ public static final class BuildOutgoingEdgeList implements GroupReduceFunction<Tuple2<Long, Long>, Tuple2<Long, Long[]>> {
+
+ private final ArrayList<Long> neighbors = new ArrayList<Long>();
+
+ @Override
+ public void reduce(Iterable<Tuple2<Long, Long>> values, Collector<Tuple2<Long, Long[]>> out) {
+ neighbors.clear();
+ Long id = 0L;
+
+ for (Tuple2<Long, Long> n : values) {
+ id = n.f0;
+ neighbors.add(n.f1);
+ }
+ out.collect(new Tuple2<Long, Long[]>(id, neighbors.toArray(new Long[neighbors.size()])));
+ }
+ }
+
+ /**
+ * Join function that distributes a fraction of a vertex's rank to all neighbors.
+ */
+ public static final class JoinVertexWithEdgesMatch implements FlatMapFunction<Tuple2<Tuple2<Long, Double>, Tuple2<Long, Long[]>>, Tuple2<Long, Double>> {
+
+ @Override
+ public void flatMap(Tuple2<Tuple2<Long, Double>, Tuple2<Long, Long[]>> value, Collector<Tuple2<Long, Double>> out){
+ Long[] neigbors = value.f1.f1;
+ double rank = value.f0.f1;
+ double rankToDistribute = rank / ((double) neigbors.length);
+
+ for (int i = 0; i < neigbors.length; i++) {
+ out.collect(new Tuple2<Long, Double>(neigbors[i], rankToDistribute));
+ }
+ }
+ }
+
+ /**
+ * The function that applies the page rank dampening formula
+ */
+ @ForwardedFields("0")
+ public static final class Dampener implements MapFunction<Tuple2<Long,Double>, Tuple2<Long,Double>> {
+
+ private final double dampening;
+ private final double randomJump;
+
+ public Dampener(double dampening, double numVertices) {
+ this.dampening = dampening;
+ this.randomJump = (1 - dampening) / numVertices;
+ }
+
+ @Override
+ public Tuple2<Long, Double> map(Tuple2<Long, Double> value) {
+ value.f1 = (value.f1 * dampening) + randomJump;
+ return value;
+ }
+ }
+
+ /**
+ * Filter that filters vertices where the rank difference is below a threshold.
+ */
+ public static final class EpsilonFilter implements FilterFunction<Tuple2<Tuple2<Long, Double>, Tuple2<Long, Double>>> {
+
+ @Override
+ public boolean filter(Tuple2<Tuple2<Long, Double>, Tuple2<Long, Double>> value) {
+ return Math.abs(value.f0.f1 - value.f1.f1) > EPSILON;
+ }
+ }
+
+ // *************************************************************************
+ // UTIL METHODS
+ // *************************************************************************
+
+ private static boolean fileOutput = false;
+ private static String pagesInputPath = null;
+ private static String linksInputPath = null;
+ private static String outputPath = null;
+ private static long numPages = 0;
+ private static int maxIterations = 10;
+
+ private static boolean parseParameters(String[] args) {
+
+ if(args.length > 0) {
+ if(args.length == 5) {
+ fileOutput = true;
+ pagesInputPath = args[0];
+ linksInputPath = args[1];
+ outputPath = args[2];
+ numPages = Integer.parseInt(args[3]);
+ maxIterations = Integer.parseInt(args[4]);
+ } else {
+ System.err.println("Usage: PageRankBasic <pages path> <links path> <output path> <num pages> <num iterations>");
+ return false;
+ }
+ } else {
+ System.out.println("Executing PageRank Basic example with default parameters and built-in default data.");
+ System.out.println(" Provide parameters to read input data from files.");
+ System.out.println(" See the documentation for the correct format of input files.");
+ System.out.println(" Usage: PageRankBasic <pages path> <links path> <output path> <num pages> <num iterations>");
+
+ numPages = PageRankData.getNumberOfPages();
+ }
+ return true;
+ }
+
+ private static DataSet<Long> getPagesDataSet(ExecutionEnvironment env) {
+ if(fileOutput) {
+ return env
+ .readCsvFile(pagesInputPath)
+ .fieldDelimiter(' ')
+ .lineDelimiter("\n")
+ .types(Long.class)
+ .map(new MapFunction<Tuple1<Long>, Long>() {
+ @Override
+ public Long map(Tuple1<Long> v) { return v.f0; }
+ });
+ } else {
+ return PageRankData.getDefaultPagesDataSet(env);
+ }
+ }
+
+ private static DataSet<Tuple2<Long, Long>> getLinksDataSet(ExecutionEnvironment env) {
+ if(fileOutput) {
+ return env.readCsvFile(linksInputPath)
+ .fieldDelimiter(' ')
+ .lineDelimiter("\n")
+ .types(Long.class, Long.class);
+ } else {
+ return PageRankData.getDefaultEdgeDataSet(env);
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/2cbbd328/flink-staging/flink-tez/src/main/java/org/apache/flink/tez/examples/TPCHQuery3.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-tez/src/main/java/org/apache/flink/tez/examples/TPCHQuery3.java b/flink-staging/flink-tez/src/main/java/org/apache/flink/tez/examples/TPCHQuery3.java
new file mode 100644
index 0000000..d61f80e
--- /dev/null
+++ b/flink-staging/flink-tez/src/main/java/org/apache/flink/tez/examples/TPCHQuery3.java
@@ -0,0 +1,224 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.tez.examples;
+
+import org.apache.flink.api.common.functions.FilterFunction;
+import org.apache.flink.api.common.functions.JoinFunction;
+import org.apache.flink.api.java.DataSet;
+import org.apache.flink.api.java.ExecutionEnvironment;
+import org.apache.flink.api.java.aggregation.Aggregations;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.api.java.tuple.Tuple4;
+import org.apache.flink.tez.client.RemoteTezEnvironment;
+
+import java.text.DateFormat;
+import java.text.ParseException;
+import java.text.SimpleDateFormat;
+import java.util.Date;
+
+public class TPCHQuery3 {
+
+ // *************************************************************************
+ // PROGRAM
+ // *************************************************************************
+
+ public static void main(String[] args) throws Exception {
+
+ if(!parseParameters(args)) {
+ return;
+ }
+
+ final RemoteTezEnvironment env = RemoteTezEnvironment.create();
+ env.setParallelism(400);
+
+
+ // get input data
+ DataSet<Lineitem> lineitems = getLineitemDataSet(env);
+ DataSet<Order> orders = getOrdersDataSet(env);
+ DataSet<Customer> customers = getCustomerDataSet(env);
+
+ // Filter market segment "AUTOMOBILE"
+ customers = customers.filter(
+ new FilterFunction<Customer>() {
+ @Override
+ public boolean filter(Customer c) {
+ return c.getMktsegment().equals("AUTOMOBILE");
+ }
+ });
+
+ // Filter all Orders with o_orderdate < 12.03.1995
+ orders = orders.filter(
+ new FilterFunction<Order>() {
+ private final DateFormat format = new SimpleDateFormat("yyyy-MM-dd");
+ private final Date date = format.parse("1995-03-12");
+
+ @Override
+ public boolean filter(Order o) throws ParseException {
+ return format.parse(o.getOrderdate()).before(date);
+ }
+ });
+
+ // Filter all Lineitems with l_shipdate > 12.03.1995
+ lineitems = lineitems.filter(
+ new FilterFunction<Lineitem>() {
+ private final DateFormat format = new SimpleDateFormat("yyyy-MM-dd");
+ private final Date date = format.parse("1995-03-12");
+
+ @Override
+ public boolean filter(Lineitem l) throws ParseException {
+ return format.parse(l.getShipdate()).after(date);
+ }
+ });
+
+ // Join customers with orders and package them into a ShippingPriorityItem
+ DataSet<ShippingPriorityItem> customerWithOrders =
+ customers.join(orders).where(0).equalTo(1)
+ .with(
+ new JoinFunction<Customer, Order, ShippingPriorityItem>() {
+ @Override
+ public ShippingPriorityItem join(Customer c, Order o) {
+ return new ShippingPriorityItem(o.getOrderKey(), 0.0, o.getOrderdate(),
+ o.getShippriority());
+ }
+ });
+
+ // Join the last join result with Lineitems
+ DataSet<ShippingPriorityItem> result =
+ customerWithOrders.join(lineitems).where(0).equalTo(0)
+ .with(
+ new JoinFunction<ShippingPriorityItem, Lineitem, ShippingPriorityItem>() {
+ @Override
+ public ShippingPriorityItem join(ShippingPriorityItem i, Lineitem l) {
+ i.setRevenue(l.getExtendedprice() * (1 - l.getDiscount()));
+ return i;
+ }
+ })
+ // Group by l_orderkey, o_orderdate and o_shippriority and compute revenue sum
+ .groupBy(0, 2, 3)
+ .aggregate(Aggregations.SUM, 1);
+
+ // emit result
+ result.writeAsCsv(outputPath, "\n", "|");
+
+ // execute program
+ env.registerMainClass(TPCHQuery3.class);
+ env.execute("TPCH Query 3 Example");
+
+ }
+
+ // *************************************************************************
+ // DATA TYPES
+ // *************************************************************************
+
+ public static class Lineitem extends Tuple4<Integer, Double, Double, String> {
+
+ public Integer getOrderkey() { return this.f0; }
+ public Double getDiscount() { return this.f2; }
+ public Double getExtendedprice() { return this.f1; }
+ public String getShipdate() { return this.f3; }
+ }
+
+ public static class Customer extends Tuple2<Integer, String> {
+
+ public Integer getCustKey() { return this.f0; }
+ public String getMktsegment() { return this.f1; }
+ }
+
+ public static class Order extends Tuple4<Integer, Integer, String, Integer> {
+
+ public Integer getOrderKey() { return this.f0; }
+ public Integer getCustKey() { return this.f1; }
+ public String getOrderdate() { return this.f2; }
+ public Integer getShippriority() { return this.f3; }
+ }
+
+ public static class ShippingPriorityItem extends Tuple4<Integer, Double, String, Integer> {
+
+ public ShippingPriorityItem() { }
+
+ public ShippingPriorityItem(Integer o_orderkey, Double revenue,
+ String o_orderdate, Integer o_shippriority) {
+ this.f0 = o_orderkey;
+ this.f1 = revenue;
+ this.f2 = o_orderdate;
+ this.f3 = o_shippriority;
+ }
+
+ public Integer getOrderkey() { return this.f0; }
+ public void setOrderkey(Integer orderkey) { this.f0 = orderkey; }
+ public Double getRevenue() { return this.f1; }
+ public void setRevenue(Double revenue) { this.f1 = revenue; }
+
+ public String getOrderdate() { return this.f2; }
+ public Integer getShippriority() { return this.f3; }
+ }
+
+ // *************************************************************************
+ // UTIL METHODS
+ // *************************************************************************
+
+ private static String lineitemPath;
+ private static String customerPath;
+ private static String ordersPath;
+ private static String outputPath;
+
+ private static boolean parseParameters(String[] programArguments) {
+
+ if(programArguments.length > 0) {
+ if(programArguments.length == 4) {
+ lineitemPath = programArguments[0];
+ customerPath = programArguments[1];
+ ordersPath = programArguments[2];
+ outputPath = programArguments[3];
+ } else {
+ System.err.println("Usage: TPCHQuery3 <lineitem-csv path> <customer-csv path> <orders-csv path> <result path>");
+ return false;
+ }
+ } else {
+ System.err.println("This program expects data from the TPC-H benchmark as input data.\n" +
+ " Due to legal restrictions, we can not ship generated data.\n" +
+ " You can find the TPC-H data generator at http://www.tpc.org/tpch/.\n" +
+ " Usage: TPCHQuery3 <lineitem-csv path> <customer-csv path> <orders-csv path> <result path>");
+ return false;
+ }
+ return true;
+ }
+
+ private static DataSet<Lineitem> getLineitemDataSet(ExecutionEnvironment env) {
+ return env.readCsvFile(lineitemPath)
+ .fieldDelimiter('|')
+ .includeFields("1000011000100000")
+ .tupleType(Lineitem.class);
+ }
+
+ private static DataSet<Customer> getCustomerDataSet(ExecutionEnvironment env) {
+ return env.readCsvFile(customerPath)
+ .fieldDelimiter('|')
+ .includeFields("10000010")
+ .tupleType(Customer.class);
+ }
+
+ private static DataSet<Order> getOrdersDataSet(ExecutionEnvironment env) {
+ return env.readCsvFile(ordersPath)
+ .fieldDelimiter('|')
+ .includeFields("110010010")
+ .tupleType(Order.class);
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/2cbbd328/flink-staging/flink-tez/src/main/java/org/apache/flink/tez/examples/TransitiveClosureNaiveStep.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-tez/src/main/java/org/apache/flink/tez/examples/TransitiveClosureNaiveStep.java b/flink-staging/flink-tez/src/main/java/org/apache/flink/tez/examples/TransitiveClosureNaiveStep.java
new file mode 100644
index 0000000..b014c3e
--- /dev/null
+++ b/flink-staging/flink-tez/src/main/java/org/apache/flink/tez/examples/TransitiveClosureNaiveStep.java
@@ -0,0 +1,135 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.tez.examples;
+
+import org.apache.flink.api.common.ProgramDescription;
+import org.apache.flink.api.common.functions.GroupReduceFunction;
+import org.apache.flink.api.common.functions.JoinFunction;
+import org.apache.flink.api.java.DataSet;
+import org.apache.flink.api.java.ExecutionEnvironment;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.examples.java.graph.util.ConnectedComponentsData;
+import org.apache.flink.util.Collector;
+
+
+/*
+ * NOTE:
+ * This program is currently supposed to throw a Compiler Exception due to TEZ-1190
+ */
+
+public class TransitiveClosureNaiveStep implements ProgramDescription {
+
+
+ public static void main (String... args) throws Exception{
+
+ if (!parseParameters(args)) {
+ return;
+ }
+
+ // set up execution environment
+ ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+
+ DataSet<Tuple2<Long, Long>> edges = getEdgeDataSet(env);
+
+ DataSet<Tuple2<Long,Long>> nextPaths = edges
+ .join(edges)
+ .where(1)
+ .equalTo(0)
+ .with(new JoinFunction<Tuple2<Long, Long>, Tuple2<Long, Long>, Tuple2<Long, Long>>() {
+ @Override
+ /**
+ left: Path (z,x) - x is reachable by z
+ right: Edge (x,y) - edge x-->y exists
+ out: Path (z,y) - y is reachable by z
+ */
+ public Tuple2<Long, Long> join(Tuple2<Long, Long> left, Tuple2<Long, Long> right) throws Exception {
+ return new Tuple2<Long, Long>(
+ new Long(left.f0),
+ new Long(right.f1));
+ }
+ })
+ .union(edges)
+ .groupBy(0, 1)
+ .reduceGroup(new GroupReduceFunction<Tuple2<Long, Long>, Tuple2<Long, Long>>() {
+ @Override
+ public void reduce(Iterable<Tuple2<Long, Long>> values, Collector<Tuple2<Long, Long>> out) throws Exception {
+ out.collect(values.iterator().next());
+ }
+ });
+
+ // emit result
+ if (fileOutput) {
+ nextPaths.writeAsCsv(outputPath, "\n", " ");
+ } else {
+ nextPaths.print();
+ }
+
+ // execute program
+ env.execute("Transitive Closure Example");
+
+ }
+
+ @Override
+ public String getDescription() {
+ return "Parameters: <edges-path> <result-path> <max-number-of-iterations>";
+ }
+
+ // *************************************************************************
+ // UTIL METHODS
+ // *************************************************************************
+
+ private static boolean fileOutput = false;
+ private static String edgesPath = null;
+ private static String outputPath = null;
+ private static int maxIterations = 10;
+
+ private static boolean parseParameters(String[] programArguments) {
+
+ if (programArguments.length > 0) {
+ // parse input arguments
+ fileOutput = true;
+ if (programArguments.length == 3) {
+ edgesPath = programArguments[0];
+ outputPath = programArguments[1];
+ maxIterations = Integer.parseInt(programArguments[2]);
+ } else {
+ System.err.println("Usage: TransitiveClosure <edges path> <result path> <max number of iterations>");
+ return false;
+ }
+ } else {
+ System.out.println("Executing TransitiveClosure example with default parameters and built-in default data.");
+ System.out.println(" Provide parameters to read input data from files.");
+ System.out.println(" See the documentation for the correct format of input files.");
+ System.out.println(" Usage: TransitiveClosure <edges path> <result path> <max number of iterations>");
+ }
+ return true;
+ }
+
+
+ private static DataSet<Tuple2<Long, Long>> getEdgeDataSet(ExecutionEnvironment env) {
+
+ if(fileOutput) {
+ return env.readCsvFile(edgesPath).fieldDelimiter(' ').types(Long.class, Long.class);
+ } else {
+ return ConnectedComponentsData.getDefaultEdgeDataSet(env);
+ }
+ }
+
+}
+
http://git-wip-us.apache.org/repos/asf/flink/blob/2cbbd328/flink-staging/flink-tez/src/main/java/org/apache/flink/tez/examples/WordCount.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-tez/src/main/java/org/apache/flink/tez/examples/WordCount.java b/flink-staging/flink-tez/src/main/java/org/apache/flink/tez/examples/WordCount.java
new file mode 100644
index 0000000..e758156
--- /dev/null
+++ b/flink-staging/flink-tez/src/main/java/org/apache/flink/tez/examples/WordCount.java
@@ -0,0 +1,129 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.tez.examples;
+
+import org.apache.flink.api.common.functions.FlatMapFunction;
+import org.apache.flink.api.java.DataSet;
+import org.apache.flink.api.java.ExecutionEnvironment;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.examples.java.wordcount.util.WordCountData;
+import org.apache.flink.tez.client.RemoteTezEnvironment;
+import org.apache.flink.util.Collector;
+
+public class WordCount {
+
+ // *************************************************************************
+ // PROGRAM
+ // *************************************************************************
+
+ public static void main(String[] args) throws Exception {
+
+ if(!parseParameters(args)) {
+ return;
+ }
+
+ // set up the execution environment
+ final RemoteTezEnvironment env = RemoteTezEnvironment.create();
+ env.setParallelism(8);
+
+ // get input data
+ DataSet<String> text = getTextDataSet(env);
+
+ DataSet<Tuple2<String, Integer>> counts =
+ // split up the lines in pairs (2-tuples) containing: (word,1)
+ text.flatMap(new Tokenizer())
+ // group by the tuple field "0" and sum up tuple field "1"
+ .groupBy(0)
+ .sum(1);
+
+ // emit result
+ if(fileOutput) {
+ counts.writeAsCsv(outputPath, "\n", " ");
+ } else {
+ counts.print();
+ }
+
+ // execute program
+ env.registerMainClass(WordCount.class);
+ env.execute("WordCount Example");
+ }
+
+ // *************************************************************************
+ // USER FUNCTIONS
+ // *************************************************************************
+
+ /**
+ * Implements the string tokenizer that splits sentences into words as a user-defined
+ * FlatMapFunction. The function takes a line (String) and splits it into
+ * multiple pairs in the form of "(word,1)" (Tuple2<String, Integer>).
+ */
+ public static final class Tokenizer implements FlatMapFunction<String, Tuple2<String, Integer>> {
+
+ @Override
+ public void flatMap(String value, Collector<Tuple2<String, Integer>> out) {
+ // normalize and split the line
+ String[] tokens = value.toLowerCase().split("\\W+");
+
+ // emit the pairs
+ for (String token : tokens) {
+ if (token.length() > 0) {
+ out.collect(new Tuple2<String, Integer>(token, 1));
+ }
+ }
+ }
+ }
+
+ // *************************************************************************
+ // UTIL METHODS
+ // *************************************************************************
+
+ private static boolean fileOutput = false;
+ private static String textPath;
+ private static String outputPath;
+
+ private static boolean parseParameters(String[] args) {
+
+ if(args.length > 0) {
+ // parse input arguments
+ fileOutput = true;
+ if(args.length == 2) {
+ textPath = args[0];
+ outputPath = args[1];
+ } else {
+ System.err.println("Usage: WordCount <text path> <result path>");
+ return false;
+ }
+ } else {
+ System.out.println("Executing WordCount example with built-in default data.");
+ System.out.println(" Provide parameters to read input data from a file.");
+ System.out.println(" Usage: WordCount <text path> <result path>");
+ }
+ return true;
+ }
+
+ private static DataSet<String> getTextDataSet(ExecutionEnvironment env) {
+ if(fileOutput) {
+ // read the text file from given input path
+ return env.readTextFile(textPath);
+ } else {
+ // get default test text data
+ return WordCountData.getDefaultTextLineDataSet(env);
+ }
+ }
+}