You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by mb...@apache.org on 2014/11/13 16:06:15 UTC
[2/5] incubator-flink git commit: [streaming]
StreamExecutionEnvironment rework + user class loader fix for cluster
deployment
[streaming] StreamExecutionEnvironment rework + user class loader fix for cluster deployment
Project: http://git-wip-us.apache.org/repos/asf/incubator-flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-flink/commit/6867f9b9
Tree: http://git-wip-us.apache.org/repos/asf/incubator-flink/tree/6867f9b9
Diff: http://git-wip-us.apache.org/repos/asf/incubator-flink/diff/6867f9b9
Branch: refs/heads/master
Commit: 6867f9b93ec1ad9a627450c4fbd0b5ff98ef6148
Parents: c6dd9b1
Author: Gyula Fora <gy...@apache.org>
Authored: Wed Nov 12 01:11:36 2014 +0100
Committer: mbalassi <mb...@apache.org>
Committed: Thu Nov 13 15:24:04 2014 +0100
----------------------------------------------------------------------
.../flink/streaming/api/JobGraphBuilder.java | 18 -----
.../flink/streaming/api/StreamConfig.java | 73 +++++++++++--------
.../api/environment/LocalStreamEnvironment.java | 17 ++---
.../environment/RemoteStreamEnvironment.java | 40 ++++++-----
.../environment/StreamContextEnvironment.java | 75 ++++++++++++++++++++
.../environment/StreamExecutionEnvironment.java | 69 +++++-------------
.../api/streamvertex/CoStreamVertex.java | 9 ++-
.../api/streamvertex/InputHandler.java | 7 +-
.../api/streamvertex/OutputHandler.java | 8 ++-
.../api/streamvertex/StreamVertex.java | 9 ++-
.../flink/streaming/util/ClusterUtil.java | 40 +----------
.../api/streamvertex/StreamVertexTest.java | 5 --
.../examples/iteration/IterateExample.java | 4 +-
.../client/program/ContextEnvironment.java | 4 ++
.../flink/client/program/JobWithJars.java | 2 +-
15 files changed, 193 insertions(+), 187 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/6867f9b9/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/JobGraphBuilder.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/JobGraphBuilder.java b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/JobGraphBuilder.java
index 5a8fd22..df59be1 100644
--- a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/JobGraphBuilder.java
+++ b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/JobGraphBuilder.java
@@ -80,8 +80,6 @@ public class JobGraphBuilder {
private Map<String, Long> iterationWaitTime;
private Map<String, Map<String, OperatorState<?>>> operatorStates;
- private int degreeOfParallelism;
- private int executionParallelism;
/**
* Creates an new {@link JobGraph} with the given name. A JobGraph is a DAG
@@ -120,22 +118,6 @@ public class JobGraphBuilder {
}
}
- public int getDefaultParallelism() {
- return degreeOfParallelism;
- }
-
- public void setDefaultParallelism(int defaultParallelism) {
- this.degreeOfParallelism = defaultParallelism;
- }
-
- public int getExecutionParallelism() {
- return executionParallelism;
- }
-
- public void setExecutionParallelism(int executionParallelism) {
- this.executionParallelism = executionParallelism;
- }
-
/**
* Adds a vertex to the streaming JobGraph with the given parameters
*
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/6867f9b9/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/StreamConfig.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/StreamConfig.java b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/StreamConfig.java
index 7da6265..3dba376 100644
--- a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/StreamConfig.java
+++ b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/StreamConfig.java
@@ -34,6 +34,7 @@ import org.apache.flink.streaming.partitioner.ShufflePartitioner;
import org.apache.flink.streaming.partitioner.StreamPartitioner;
import org.apache.flink.streaming.state.OperatorState;
import org.apache.flink.streaming.util.serialization.TypeWrapper;
+import org.apache.flink.util.InstantiationUtil;
public class StreamConfig {
private static final String INPUT_TYPE = "inputType_";
@@ -98,20 +99,20 @@ public class StreamConfig {
setTypeWrapper(TYPE_WRAPPER_OUT_2, typeWrapper);
}
- public <T> TypeInformation<T> getTypeInfoIn1() {
- return getTypeInfo(TYPE_WRAPPER_IN_1);
+ public <T> TypeInformation<T> getTypeInfoIn1(ClassLoader cl) {
+ return getTypeInfo(TYPE_WRAPPER_IN_1, cl);
}
- public <T> TypeInformation<T> getTypeInfoIn2() {
- return getTypeInfo(TYPE_WRAPPER_IN_2);
+ public <T> TypeInformation<T> getTypeInfoIn2(ClassLoader cl) {
+ return getTypeInfo(TYPE_WRAPPER_IN_2, cl);
}
- public <T> TypeInformation<T> getTypeInfoOut1() {
- return getTypeInfo(TYPE_WRAPPER_OUT_1);
+ public <T> TypeInformation<T> getTypeInfoOut1(ClassLoader cl) {
+ return getTypeInfo(TYPE_WRAPPER_OUT_1, cl);
}
- public <T> TypeInformation<T> getTypeInfoOut2() {
- return getTypeInfo(TYPE_WRAPPER_OUT_2);
+ public <T> TypeInformation<T> getTypeInfoOut2(ClassLoader cl) {
+ return getTypeInfo(TYPE_WRAPPER_OUT_2, cl);
}
private void setTypeWrapper(String key, TypeWrapper<?> typeWrapper) {
@@ -119,18 +120,17 @@ public class StreamConfig {
}
@SuppressWarnings("unchecked")
- private <T> TypeInformation<T> getTypeInfo(String key) {
- byte[] serializedWrapper = config.getBytes(key, null);
+ private <T> TypeInformation<T> getTypeInfo(String key, ClassLoader cl) {
- if (serializedWrapper == null) {
- throw new RuntimeException("TypeSerializationWrapper must be set");
+ TypeWrapper<T> typeWrapper;
+ try {
+ typeWrapper = (TypeWrapper<T>) InstantiationUtil.readObjectFromConfig(this.config, key,
+ cl);
+ } catch (Exception e) {
+ throw new RuntimeException("Cannot load typeinfo");
}
-
- TypeWrapper<T> typeWrapper = (TypeWrapper<T>) SerializationUtils
- .deserialize(serializedWrapper);
if (typeWrapper != null) {
return typeWrapper.getTypeInfo();
-
} else {
return null;
}
@@ -166,9 +166,10 @@ public class StreamConfig {
}
}
- public <T> T getUserInvokable() {
+ @SuppressWarnings({ "unchecked" })
+ public <T> T getUserInvokable(ClassLoader cl) {
try {
- return SerializationUtils.deserialize(config.getBytes(SERIALIZEDUDF, null));
+ return (T) InstantiationUtil.readObjectFromConfig(this.config, SERIALIZEDUDF, cl);
} catch (Exception e) {
throw new StreamVertexException("Cannot instantiate user function", e);
}
@@ -189,10 +190,10 @@ public class StreamConfig {
}
}
- public Object getFunction() {
+ public Object getFunction(ClassLoader cl) {
try {
- return SerializationUtils.deserialize(config.getBytes(FUNCTION, null));
- } catch (SerializationException e) {
+ return InstantiationUtil.readObjectFromConfig(this.config, FUNCTION, cl);
+ } catch (Exception e) {
throw new RuntimeException("Cannot deserialize invokable object", e);
}
}
@@ -216,9 +217,11 @@ public class StreamConfig {
}
}
- public <T> OutputSelector<T> getOutputSelector() {
+ @SuppressWarnings("unchecked")
+ public <T> OutputSelector<T> getOutputSelector(ClassLoader cl) {
try {
- return SerializationUtils.deserialize(config.getBytes(OUTPUT_SELECTOR, null));
+ return (OutputSelector<T>) InstantiationUtil.readObjectFromConfig(this.config,
+ OUTPUT_SELECTOR, cl);
} catch (Exception e) {
throw new StreamVertexException("Cannot deserialize and instantiate OutputSelector", e);
}
@@ -254,10 +257,16 @@ public class StreamConfig {
SerializationUtils.serialize(partitionerObject));
}
- public <T> StreamPartitioner<T> getPartitioner(int outputIndex) throws ClassNotFoundException,
- IOException {
- return SerializationUtils.deserialize(config.getBytes(PARTITIONER_OBJECT + outputIndex,
- SerializationUtils.serialize(new ShufflePartitioner<T>())));
+ public <T> StreamPartitioner<T> getPartitioner(ClassLoader cl, int outputIndex)
+ throws ClassNotFoundException, IOException {
+ @SuppressWarnings("unchecked")
+ StreamPartitioner<T> partitioner = (StreamPartitioner<T>) InstantiationUtil
+ .readObjectFromConfig(this.config, PARTITIONER_OBJECT + outputIndex, cl);
+ if (partitioner != null) {
+ return partitioner;
+ } else {
+ return new ShufflePartitioner<T>();
+ }
}
public void setSelectAll(int outputIndex, Boolean selectAll) {
@@ -323,8 +332,14 @@ public class StreamConfig {
config.setBytes(OPERATOR_STATES, SerializationUtils.serialize((Serializable) states));
}
- public Map<String, OperatorState<?>> getOperatorStates() {
- return SerializationUtils.deserialize(config.getBytes(OPERATOR_STATES, null));
+ @SuppressWarnings("unchecked")
+ public Map<String, OperatorState<?>> getOperatorStates(ClassLoader cl) {
+ try {
+ return (Map<String, OperatorState<?>>) InstantiationUtil.readObjectFromConfig(
+ this.config, OPERATOR_STATES, cl);
+ } catch (Exception e) {
+ throw new RuntimeException("Could not load operator state");
+ }
}
}
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/6867f9b9/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/environment/LocalStreamEnvironment.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/environment/LocalStreamEnvironment.java b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/environment/LocalStreamEnvironment.java
index 5c0f555..505f0e7 100755
--- a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/environment/LocalStreamEnvironment.java
+++ b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/environment/LocalStreamEnvironment.java
@@ -21,13 +21,15 @@ import org.apache.flink.streaming.util.ClusterUtil;
public class LocalStreamEnvironment extends StreamExecutionEnvironment {
+ protected static ClassLoader userClassLoader;
+
/**
* Executes the JobGraph of the on a mini cluster of CLusterUtil with a
* default name.
*/
@Override
public void execute() throws Exception {
- ClusterUtil.runOnMiniCluster(this.jobGraphBuilder.getJobGraph(), getExecutionParallelism());
+ ClusterUtil.runOnMiniCluster(this.jobGraphBuilder.getJobGraph(), getDegreeOfParallelism());
}
/**
@@ -39,19 +41,12 @@ public class LocalStreamEnvironment extends StreamExecutionEnvironment {
*/
@Override
public void execute(String jobName) throws Exception {
- if (localExecutionIsAllowed()) {
- ClusterUtil.runOnMiniCluster(this.jobGraphBuilder.getJobGraph(jobName),
- getExecutionParallelism());
- } else {
- ClusterUtil.runOnLocalCluster(this.jobGraphBuilder.getJobGraph(jobName),
- getExecutionParallelism());
- }
-
+ ClusterUtil.runOnMiniCluster(this.jobGraphBuilder.getJobGraph(jobName),
+ getDegreeOfParallelism());
}
public void executeTest(long memorySize) throws Exception {
- ClusterUtil.runOnMiniCluster(this.jobGraphBuilder.getJobGraph(), getExecutionParallelism(),
+ ClusterUtil.runOnMiniCluster(this.jobGraphBuilder.getJobGraph(), getDegreeOfParallelism(),
memorySize);
}
-
}
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/6867f9b9/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/environment/RemoteStreamEnvironment.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/environment/RemoteStreamEnvironment.java b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/environment/RemoteStreamEnvironment.java
index 864e18d..d833c8e 100644
--- a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/environment/RemoteStreamEnvironment.java
+++ b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/environment/RemoteStreamEnvironment.java
@@ -20,9 +20,8 @@ package org.apache.flink.streaming.api.environment;
import java.io.File;
import java.io.IOException;
import java.net.InetSocketAddress;
-
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
+import java.util.ArrayList;
+import java.util.List;
import org.apache.flink.client.program.Client;
import org.apache.flink.client.program.JobWithJars;
@@ -30,13 +29,15 @@ import org.apache.flink.client.program.ProgramInvocationException;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.core.fs.Path;
import org.apache.flink.runtime.jobgraph.JobGraph;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
public class RemoteStreamEnvironment extends StreamExecutionEnvironment {
private static final Logger LOG = LoggerFactory.getLogger(RemoteStreamEnvironment.class);
private String host;
private int port;
- private String[] jarFiles;
+ private List<File> jarFiles;
/**
* Creates a new RemoteStreamEnvironment that points to the master
@@ -65,19 +66,28 @@ public class RemoteStreamEnvironment extends StreamExecutionEnvironment {
this.host = host;
this.port = port;
- this.jarFiles = jarFiles;
+ this.jarFiles = new ArrayList<File>();
+ for (int i = 0; i < jarFiles.length; i++) {
+ File file = new File(jarFiles[i]);
+ try {
+ JobWithJars.checkJarFile(file);
+ } catch (IOException e) {
+ throw new RuntimeException("Problem with jar file " + file.getAbsolutePath(), e);
+ }
+ this.jarFiles.add(file);
+ }
}
@Override
public void execute() {
-
+
JobGraph jobGraph = jobGraphBuilder.getJobGraph();
executeRemotely(jobGraph);
}
-
+
@Override
public void execute(String jobName) {
-
+
JobGraph jobGraph = jobGraphBuilder.getJobGraph(jobName);
executeRemotely(jobGraph);
}
@@ -85,25 +95,21 @@ public class RemoteStreamEnvironment extends StreamExecutionEnvironment {
/**
* Executes the remote job.
*
- * @param jobGraph jobGraph to execute
+ * @param jobGraph
+ * jobGraph to execute
*/
private void executeRemotely(JobGraph jobGraph) {
if (LOG.isInfoEnabled()) {
LOG.info("Running remotely at {}:{}", host, port);
}
- for (int i = 0; i < jarFiles.length; i++) {
- File file = new File(jarFiles[i]);
- try {
- JobWithJars.checkJarFile(file);
- } catch (IOException e) {
- throw new RuntimeException("Problem with jar file " + file.getAbsolutePath(), e);
- }
+ for (File file : jarFiles) {
jobGraph.addJar(new Path(file.getAbsolutePath()));
}
Configuration configuration = jobGraph.getJobConfiguration();
- Client client = new Client(new InetSocketAddress(host, port), configuration, getClass().getClassLoader());
+ Client client = new Client(new InetSocketAddress(host, port), configuration,
+ JobWithJars.buildUserCodeClassLoader(jarFiles, JobWithJars.class.getClassLoader()));
try {
client.run(jobGraph, true);
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/6867f9b9/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/environment/StreamContextEnvironment.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/environment/StreamContextEnvironment.java b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/environment/StreamContextEnvironment.java
new file mode 100644
index 0000000..c157435
--- /dev/null
+++ b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/environment/StreamContextEnvironment.java
@@ -0,0 +1,75 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.api.environment;
+
+import java.io.File;
+import java.util.List;
+
+import org.apache.flink.client.program.Client;
+import org.apache.flink.configuration.ConfigConstants;
+import org.apache.flink.configuration.GlobalConfiguration;
+import org.apache.flink.core.fs.Path;
+import org.apache.flink.runtime.jobgraph.JobGraph;
+
+public class StreamContextEnvironment extends StreamExecutionEnvironment {
+
+ protected static ClassLoader userClassLoader;
+ protected List<File> jars;
+ protected Client client;
+
+ protected StreamContextEnvironment(Client client, List<File> jars, int dop) {
+ this.client = client;
+ this.jars = jars;
+ if (dop > 0) {
+ setDegreeOfParallelism(dop);
+ } else {
+ setDegreeOfParallelism(GlobalConfiguration.getInteger(
+ ConfigConstants.DEFAULT_PARALLELIZATION_DEGREE_KEY,
+ ConfigConstants.DEFAULT_PARALLELIZATION_DEGREE));
+ }
+ }
+
+ @Override
+ public void execute() throws Exception {
+ execute(null);
+ }
+
+ @Override
+ public void execute(String jobName) throws Exception {
+
+ JobGraph jobGraph;
+ if (jobName == null) {
+ jobGraph = this.jobGraphBuilder.getJobGraph();
+ } else {
+ jobGraph = this.jobGraphBuilder.getJobGraph(jobName);
+ }
+
+ for (File file : jars) {
+ jobGraph.addJar(new Path(file.getAbsolutePath()));
+ }
+
+ try {
+ client.run(jobGraph, true);
+
+ } catch (Exception e) {
+ throw e;
+ }
+
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/6867f9b9/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/environment/StreamExecutionEnvironment.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/environment/StreamExecutionEnvironment.java b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/environment/StreamExecutionEnvironment.java
index 0d00db3..600a87a 100644
--- a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/environment/StreamExecutionEnvironment.java
+++ b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/environment/StreamExecutionEnvironment.java
@@ -20,10 +20,13 @@ package org.apache.flink.streaming.api.environment;
import java.io.File;
import java.io.Serializable;
import java.util.Collection;
+import java.util.List;
import org.apache.commons.lang3.SerializationException;
import org.apache.commons.lang3.SerializationUtils;
import org.apache.flink.api.java.ExecutionEnvironment;
+import org.apache.flink.client.program.Client;
+import org.apache.flink.client.program.ContextEnvironment;
import org.apache.flink.streaming.api.JobGraphBuilder;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
@@ -44,21 +47,10 @@ import org.apache.flink.streaming.util.serialization.TypeWrapper;
*/
public abstract class StreamExecutionEnvironment {
- /**
- * The environment of the context (local by default, cluster if invoked
- * through command line)
- */
- private static StreamExecutionEnvironment contextEnvironment;
-
- /** flag to disable local executor when using the ContextEnvironment */
- private static boolean allowLocalExecution = true;
-
private static int defaultLocalDop = Runtime.getRuntime().availableProcessors();
private int degreeOfParallelism = 1;
- private int executionParallelism = -1;
-
private long bufferTimeout = 100;
protected JobGraphBuilder jobGraphBuilder;
@@ -74,10 +66,6 @@ public abstract class StreamExecutionEnvironment {
jobGraphBuilder = new JobGraphBuilder();
}
- public int getExecutionParallelism() {
- return executionParallelism == -1 ? degreeOfParallelism : executionParallelism;
- }
-
/**
* Gets the degree of parallelism with which operation are executed by
* default. Operations can individually override this value to use a
@@ -143,21 +131,6 @@ public abstract class StreamExecutionEnvironment {
return this.bufferTimeout;
}
- /**
- * Sets the number of hardware contexts (CPU cores / threads) used when
- * executed in {@link LocalStreamEnvironment}.
- *
- * @param degreeOfParallelism
- * The degree of parallelism in local environment
- */
- public void setExecutionParallelism(int degreeOfParallelism) {
- if (degreeOfParallelism < 1) {
- throw new IllegalArgumentException("Degree of parallelism must be at least one.");
- }
-
- this.executionParallelism = degreeOfParallelism;
- }
-
// --------------------------------------------------------------------------------------------
// Data stream creations
// --------------------------------------------------------------------------------------------
@@ -351,8 +324,19 @@ public abstract class StreamExecutionEnvironment {
* executed.
*/
public static StreamExecutionEnvironment getExecutionEnvironment() {
- allowLocalExecution = ExecutionEnvironment.localExecutionIsAllowed();
- return createLocalEnvironment();
+ ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+ if (env instanceof ContextEnvironment) {
+ ContextEnvironment ctx = (ContextEnvironment) env;
+ return createContextEnvironment(ctx.getClient(), ctx.getJars(),
+ ctx.getDegreeOfParallelism());
+ } else {
+ return createLocalEnvironment();
+ }
+ }
+
+ private static StreamExecutionEnvironment createContextEnvironment(Client client,
+ List<File> jars, int dop) {
+ return new StreamContextEnvironment(client, jars, dop);
}
/**
@@ -440,27 +424,6 @@ public abstract class StreamExecutionEnvironment {
return rec;
}
- // --------------------------------------------------------------------------------------------
- // Methods to control the context and local environments for execution from
- // packaged programs
- // --------------------------------------------------------------------------------------------
-
- protected static void initializeContextEnvironment(StreamExecutionEnvironment ctx) {
- contextEnvironment = ctx;
- }
-
- protected static boolean isContextEnvironmentSet() {
- return contextEnvironment != null;
- }
-
- protected static void disableLocalExecution() {
- allowLocalExecution = false;
- }
-
- public static boolean localExecutionIsAllowed() {
- return allowLocalExecution;
- }
-
/**
* Triggers the program execution. The environment will execute all parts of
* the program that have resulted in a "sink" operation. Sink operations are
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/6867f9b9/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/streamvertex/CoStreamVertex.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/streamvertex/CoStreamVertex.java b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/streamvertex/CoStreamVertex.java
index 5a6519d..2464ff2 100644
--- a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/streamvertex/CoStreamVertex.java
+++ b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/streamvertex/CoStreamVertex.java
@@ -29,8 +29,7 @@ import org.apache.flink.streaming.io.CoReaderIterator;
import org.apache.flink.streaming.io.CoRecordReader;
import org.apache.flink.util.MutableObjectIterator;
-public class CoStreamVertex<IN1, IN2, OUT> extends
- StreamVertex<IN1,OUT> {
+public class CoStreamVertex<IN1, IN2, OUT> extends StreamVertex<IN1, OUT> {
private OutputHandler<OUT> outputHandler;
@@ -53,10 +52,10 @@ public class CoStreamVertex<IN1, IN2, OUT> extends
}
private void setDeserializers() {
- TypeInformation<IN1> inputTypeInfo1 = configuration.getTypeInfoIn1();
+ TypeInformation<IN1> inputTypeInfo1 = configuration.getTypeInfoIn1(userClassLoader);
inputDeserializer1 = new StreamRecordSerializer<IN1>(inputTypeInfo1);
- TypeInformation<IN2> inputTypeInfo2 = configuration.getTypeInfoIn2();
+ TypeInformation<IN2> inputTypeInfo2 = configuration.getTypeInfoIn2(userClassLoader);
inputDeserializer2 = new StreamRecordSerializer<IN2>(inputTypeInfo2);
}
@@ -72,7 +71,7 @@ public class CoStreamVertex<IN1, IN2, OUT> extends
@Override
protected void setInvokable() {
- userInvokable = configuration.getUserInvokable();
+ userInvokable = configuration.getUserInvokable(userClassLoader);
userInvokable.initialize(outputHandler.getCollector(), coIter, inputDeserializer1,
inputDeserializer2, isMutable);
}
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/6867f9b9/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/streamvertex/InputHandler.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/streamvertex/InputHandler.java b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/streamvertex/InputHandler.java
index 17d2ae5..9d65a21 100644
--- a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/streamvertex/InputHandler.java
+++ b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/streamvertex/InputHandler.java
@@ -35,10 +35,10 @@ public class InputHandler<IN> {
private MutableObjectIterator<StreamRecord<IN>> inputIter;
private MutableReader<IOReadableWritable> inputs;
- private StreamVertex<IN,?> streamVertex;
+ private StreamVertex<IN, ?> streamVertex;
private StreamConfig configuration;
- public InputHandler(StreamVertex<IN,?> streamComponent) {
+ public InputHandler(StreamVertex<IN, ?> streamComponent) {
this.streamVertex = streamComponent;
this.configuration = new StreamConfig(streamComponent.getTaskConfiguration());
try {
@@ -75,7 +75,8 @@ public class InputHandler<IN> {
}
private void setDeserializer() {
- TypeInformation<IN> inTupleTypeInfo = configuration.getTypeInfoIn1();
+ TypeInformation<IN> inTupleTypeInfo = configuration
+ .getTypeInfoIn1(streamVertex.userClassLoader);
if (inTupleTypeInfo != null) {
inputSerializer = new StreamRecordSerializer<IN>(inTupleTypeInfo);
}
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/6867f9b9/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/streamvertex/OutputHandler.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/streamvertex/OutputHandler.java b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/streamvertex/OutputHandler.java
index 8b72195..d8eb146 100644
--- a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/streamvertex/OutputHandler.java
+++ b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/streamvertex/OutputHandler.java
@@ -81,7 +81,8 @@ public class OutputHandler<OUT> {
private StreamCollector<OUT> setCollector() {
if (streamVertex.configuration.getDirectedEmit()) {
- OutputSelector<OUT> outputSelector = streamVertex.configuration.getOutputSelector();
+ OutputSelector<OUT> outputSelector = streamVertex.configuration
+ .getOutputSelector(streamVertex.userClassLoader);
collector = new DirectedStreamCollector<OUT>(streamVertex.getInstanceID(),
outSerializationDelegate, outputSelector);
@@ -97,7 +98,7 @@ public class OutputHandler<OUT> {
}
void setSerializers() {
- outTypeInfo = configuration.getTypeInfoOut1();
+ outTypeInfo = configuration.getTypeInfoOut1(streamVertex.userClassLoader);
if (outTypeInfo != null) {
outSerializer = new StreamRecordSerializer<OUT>(outTypeInfo);
outSerializationDelegate = new SerializationDelegate<StreamRecord<OUT>>(outSerializer);
@@ -110,7 +111,8 @@ public class OutputHandler<OUT> {
StreamPartitioner<OUT> outputPartitioner = null;
try {
- outputPartitioner = configuration.getPartitioner(outputNumber);
+ outputPartitioner = configuration.getPartitioner(streamVertex.userClassLoader,
+ outputNumber);
} catch (Exception e) {
throw new StreamVertexException("Cannot deserialize partitioner for "
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/6867f9b9/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/streamvertex/StreamVertex.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/streamvertex/StreamVertex.java b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/streamvertex/StreamVertex.java
index a8ec98f..2db0d8b 100644
--- a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/streamvertex/StreamVertex.java
+++ b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/streamvertex/StreamVertex.java
@@ -44,6 +44,8 @@ public class StreamVertex<IN, OUT> extends AbstractInvokable {
private StreamingRuntimeContext context;
private Map<String, OperatorState<?>> states;
+ protected ClassLoader userClassLoader;
+
public StreamVertex() {
userInvokable = null;
numTasks = newVertex();
@@ -63,12 +65,13 @@ public class StreamVertex<IN, OUT> extends AbstractInvokable {
}
protected void initialize() {
+ this.userClassLoader = getUserCodeClassLoader();
this.configuration = new StreamConfig(getTaskConfiguration());
this.name = configuration.getVertexName();
this.isMutable = configuration.getMutability();
this.functionName = configuration.getFunctionName();
- this.function = configuration.getFunction();
- this.states = configuration.getOperatorStates();
+ this.function = configuration.getFunction(userClassLoader);
+ this.states = configuration.getOperatorStates(userClassLoader);
this.context = createRuntimeContext(name, this.states);
}
@@ -85,7 +88,7 @@ public class StreamVertex<IN, OUT> extends AbstractInvokable {
}
protected void setInvokable() {
- userInvokable = configuration.getUserInvokable();
+ userInvokable = configuration.getUserInvokable(userClassLoader);
userInvokable.initialize(outputHandler.getCollector(), inputHandler.getInputIter(),
inputHandler.getInputSerializer(), isMutable);
}
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/6867f9b9/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/util/ClusterUtil.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/util/ClusterUtil.java b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/util/ClusterUtil.java
index 278cb5a..ebe383d 100755
--- a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/util/ClusterUtil.java
+++ b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/util/ClusterUtil.java
@@ -19,10 +19,8 @@ package org.apache.flink.streaming.util;
import java.net.InetSocketAddress;
-import org.apache.flink.api.java.ExecutionEnvironment;
import org.apache.flink.client.minicluster.NepheleMiniCluster;
import org.apache.flink.client.program.Client;
-import org.apache.flink.client.program.ContextEnvironment;
import org.apache.flink.client.program.ProgramInvocationException;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.runtime.jobgraph.JobGraph;
@@ -72,44 +70,12 @@ public class ClusterUtil {
} catch (Exception e) {
throw e;
} finally {
- try {
- exec.stop();
- } catch (Throwable t) {
- }
+ exec.stop();
}
}
- public static void runOnLocalCluster(JobGraph jobGraph, int degreeOfPrallelism)
- throws Exception {
-
- if (LOG.isInfoEnabled()) {
- LOG.info("Running on mini cluster");
- }
-
- try {
-
- Client client = ((ContextEnvironment) ExecutionEnvironment.getExecutionEnvironment())
- .getClient();
-
- client.run(jobGraph, true);
- } catch (ProgramInvocationException e) {
- if (e.getMessage().contains("GraphConversionException")) {
- throw new Exception(CANNOT_EXECUTE_EMPTY_JOB, e);
- } else {
- throw e;
- }
- } catch (Exception e) {
- throw e;
- } finally {
- try {
- } catch (Throwable t) {
- }
- }
- }
-
- public static void runOnMiniCluster(JobGraph jobGraph, int numberOfTaskTrackers)
- throws Exception {
- runOnMiniCluster(jobGraph, numberOfTaskTrackers, -1);
+ public static void runOnMiniCluster(JobGraph jobGraph, int numOfSlots) throws Exception {
+ runOnMiniCluster(jobGraph, numOfSlots, -1);
}
}
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/6867f9b9/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/streamvertex/StreamVertexTest.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/streamvertex/StreamVertexTest.java b/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/streamvertex/StreamVertexTest.java
index e01809d..765de9c 100644
--- a/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/streamvertex/StreamVertexTest.java
+++ b/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/streamvertex/StreamVertexTest.java
@@ -112,11 +112,6 @@ public class StreamVertexTest {
}
try {
- env.setExecutionParallelism(-10);
- fail();
- } catch (IllegalArgumentException e) {
- }
- try {
env.generateSequence(1, 10).project(2);
fail();
} catch (RuntimeException e) {
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/6867f9b9/flink-addons/flink-streaming/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/iteration/IterateExample.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/iteration/IterateExample.java b/flink-addons/flink-streaming/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/iteration/IterateExample.java
index 82d81f4..e2094fb 100644
--- a/flink-addons/flink-streaming/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/iteration/IterateExample.java
+++ b/flink-addons/flink-streaming/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/iteration/IterateExample.java
@@ -73,13 +73,13 @@ public class IterateExample {
input.add(new Tuple2<Double, Integer>(0., 0));
}
- StreamExecutionEnvironment env = StreamExecutionEnvironment.createLocalEnvironment(2)
+ StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment()
.setBufferTimeout(1);
IterativeDataStream<Tuple2<Double, Integer>> it = env.fromCollection(input).iterate()
.setMaxWaitTime(3000);
- SplitDataStream<Tuple2<Double,Integer>> step = it.map(new Step()).shuffle().setParallelism(2).split(new MySelector());
+ SplitDataStream<Tuple2<Double,Integer>> step = it.map(new Step()).shuffle().split(new MySelector());
it.closeWith(step.select("iterate"));
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/6867f9b9/flink-clients/src/main/java/org/apache/flink/client/program/ContextEnvironment.java
----------------------------------------------------------------------
diff --git a/flink-clients/src/main/java/org/apache/flink/client/program/ContextEnvironment.java b/flink-clients/src/main/java/org/apache/flink/client/program/ContextEnvironment.java
index 4f91514..89b301a 100644
--- a/flink-clients/src/main/java/org/apache/flink/client/program/ContextEnvironment.java
+++ b/flink-clients/src/main/java/org/apache/flink/client/program/ContextEnvironment.java
@@ -83,4 +83,8 @@ public class ContextEnvironment extends ExecutionEnvironment {
public Client getClient() {
return this.client;
}
+
+ public List<File> getJars(){
+ return jarFilesToAttach;
+ }
}
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/6867f9b9/flink-clients/src/main/java/org/apache/flink/client/program/JobWithJars.java
----------------------------------------------------------------------
diff --git a/flink-clients/src/main/java/org/apache/flink/client/program/JobWithJars.java b/flink-clients/src/main/java/org/apache/flink/client/program/JobWithJars.java
index b8151da..b86487f 100644
--- a/flink-clients/src/main/java/org/apache/flink/client/program/JobWithJars.java
+++ b/flink-clients/src/main/java/org/apache/flink/client/program/JobWithJars.java
@@ -102,7 +102,7 @@ public class JobWithJars {
// TODO: Check if proper JAR file
}
- static ClassLoader buildUserCodeClassLoader(List<File> jars, ClassLoader parent) {
+ public static ClassLoader buildUserCodeClassLoader(List<File> jars, ClassLoader parent) {
URL[] urls = new URL[jars.size()];
try {