You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by rm...@apache.org on 2014/06/09 20:30:59 UTC
[24/30] Offer buffer-oriented API for I/O (#25)
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/2db78a8d/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/example/union/UnionJob.java
----------------------------------------------------------------------
diff --git a/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/example/union/UnionJob.java b/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/example/union/UnionJob.java
deleted file mode 100644
index d9c09d3..0000000
--- a/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/example/union/UnionJob.java
+++ /dev/null
@@ -1,104 +0,0 @@
-/***********************************************************************************************************************
- * Copyright (C) 2010-2013 by the Stratosphere project (http://stratosphere.eu)
- *
- * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
- * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations under the License.
- **********************************************************************************************************************/
-
-package eu.stratosphere.nephele.example.union;
-
-import java.io.File;
-import java.io.IOException;
-
-import eu.stratosphere.configuration.Configuration;
-import eu.stratosphere.core.fs.Path;
-import eu.stratosphere.nephele.client.JobClient;
-import eu.stratosphere.nephele.io.DistributionPattern;
-import eu.stratosphere.nephele.io.channels.ChannelType;
-import eu.stratosphere.nephele.jobgraph.JobFileInputVertex;
-import eu.stratosphere.nephele.jobgraph.JobFileOutputVertex;
-import eu.stratosphere.nephele.jobgraph.JobGraph;
-import eu.stratosphere.nephele.jobgraph.JobGraphDefinitionException;
-import eu.stratosphere.nephele.jobgraph.JobTaskVertex;
-import eu.stratosphere.nephele.util.JarFileCreator;
-
-public class UnionJob {
-
- public static void main(final String[] args) {
-
- // Create graph and define vertices
- final JobGraph unionGraph = new JobGraph("Union Job");
-
- final JobFileInputVertex input1 = new JobFileInputVertex("Input 1", unionGraph);
- input1.setFileInputClass(ProducerTask.class);
- input1.setFilePath(new Path("file:///tmp/"));
-
- final JobFileInputVertex input2 = new JobFileInputVertex("Input 2", unionGraph);
- input2.setFileInputClass(ProducerTask.class);
- input2.setFilePath(new Path("file:///tmp/"));
-
- final JobTaskVertex union = new JobTaskVertex("Union", unionGraph);
- union.setTaskClass(UnionTask.class);
-
- final JobFileOutputVertex output = new JobFileOutputVertex("Output", unionGraph);
- output.setFileOutputClass(ConsumerTask.class);
- output.setFilePath(new Path("file:///tmp/"));
-
- // Create edges between vertices
- try {
- input1.connectTo(union, ChannelType.INMEMORY, DistributionPattern.POINTWISE);
- input2.connectTo(union, ChannelType.NETWORK, DistributionPattern.POINTWISE);
- union.connectTo(output, ChannelType.INMEMORY);
- } catch (JobGraphDefinitionException e) {
- e.printStackTrace();
- return;
- }
-
- // Create jar file and attach it
- final File jarFile = new File("/tmp/unionJob.jar");
- final JarFileCreator jarFileCreator = new JarFileCreator(jarFile);
- jarFileCreator.addClass(ProducerTask.class);
- jarFileCreator.addClass(UnionTask.class);
- jarFileCreator.addClass(ConsumerTask.class);
-
- try {
- jarFileCreator.createJarFile();
- } catch (IOException ioe) {
-
- ioe.printStackTrace();
-
- if (jarFile.exists()) {
- jarFile.delete();
- }
-
- return;
- }
-
- //Define instance sharing
- input1.setVertexToShareInstancesWith(output);
- input2.setVertexToShareInstancesWith(output);
- union.setVertexToShareInstancesWith(output);
-
- unionGraph.addJar(new Path("file://" + jarFile.getAbsolutePath()));
-
- final Configuration conf = new Configuration();
- conf.setString("jobmanager.rpc.address", "localhost");
-
- try {
- final JobClient jobClient = new JobClient(unionGraph, conf);
- jobClient.submitJobAndWait();
- } catch (Exception e) {
- e.printStackTrace();
- }
-
- if (jarFile.exists()) {
- jarFile.delete();
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/2db78a8d/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/example/union/UnionTask.java
----------------------------------------------------------------------
diff --git a/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/example/union/UnionTask.java b/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/example/union/UnionTask.java
deleted file mode 100644
index d99eb42..0000000
--- a/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/example/union/UnionTask.java
+++ /dev/null
@@ -1,45 +0,0 @@
-/***********************************************************************************************************************
- * Copyright (C) 2010-2013 by the Stratosphere project (http://stratosphere.eu)
- *
- * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
- * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations under the License.
- **********************************************************************************************************************/
-
-package eu.stratosphere.nephele.example.union;
-
-import eu.stratosphere.core.io.StringRecord;
-import eu.stratosphere.nephele.io.MutableRecordReader;
-import eu.stratosphere.nephele.io.RecordWriter;
-import eu.stratosphere.nephele.io.UnionRecordReader;
-import eu.stratosphere.nephele.template.AbstractTask;
-
-public class UnionTask extends AbstractTask {
-
- private UnionRecordReader<StringRecord> input;
-
- private RecordWriter<StringRecord> output;
-
- @Override
- public void registerInputOutput() {
- @SuppressWarnings("unchecked")
- MutableRecordReader<StringRecord>[] recordReaders = (MutableRecordReader<StringRecord>[]) new MutableRecordReader<?>[2];
- recordReaders[0] = new MutableRecordReader<StringRecord>(this);
- recordReaders[1] = new MutableRecordReader<StringRecord>(this);
-
- this.input = new UnionRecordReader<StringRecord>(recordReaders, StringRecord.class);
- this.output = new RecordWriter<StringRecord>(this, StringRecord.class);
- }
-
- @Override
- public void invoke() throws Exception {
- while (this.input.hasNext()) {
- this.output.emit(this.input.next());
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/2db78a8d/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/execution/CancelTaskException.java
----------------------------------------------------------------------
diff --git a/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/execution/CancelTaskException.java b/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/execution/CancelTaskException.java
new file mode 100644
index 0000000..6a09e89
--- /dev/null
+++ b/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/execution/CancelTaskException.java
@@ -0,0 +1,25 @@
+/***********************************************************************************************************************
+ *
+ * Copyright (C) 2010-2013 by the Stratosphere project (http://stratosphere.eu)
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
+ * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations under the License.
+ *
+ **********************************************************************************************************************/
+package eu.stratosphere.nephele.execution;
+
+
+import eu.stratosphere.pact.runtime.task.chaining.ExceptionInChainedStubException;
+
+/**
+ * Thrown to trigger a canceling of the executing task. Intended to cause a cancelled status, rather than a failed status.
+ */
+public class CancelTaskException extends RuntimeException {
+ private static final long serialVersionUID = 1L;
+}
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/2db78a8d/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/execution/Environment.java
----------------------------------------------------------------------
diff --git a/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/execution/Environment.java b/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/execution/Environment.java
index a606dcf..e0bcc70 100644
--- a/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/execution/Environment.java
+++ b/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/execution/Environment.java
@@ -1,5 +1,5 @@
/***********************************************************************************************************************
- * Copyright (C) 2010-2013 by the Stratosphere project (http://stratosphere.eu)
+ * Copyright (C) 2010-2014 by the Stratosphere project (http://stratosphere.eu)
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
@@ -13,30 +13,25 @@
package eu.stratosphere.nephele.execution;
-import java.util.Map;
-import java.util.Set;
-import java.util.concurrent.FutureTask;
-
import eu.stratosphere.configuration.Configuration;
import eu.stratosphere.core.fs.Path;
import eu.stratosphere.core.io.IOReadableWritable;
-import eu.stratosphere.nephele.io.ChannelSelector;
-import eu.stratosphere.nephele.io.GateID;
-import eu.stratosphere.nephele.io.InputGate;
-import eu.stratosphere.nephele.io.OutputGate;
-import eu.stratosphere.nephele.io.RecordDeserializerFactory;
-import eu.stratosphere.nephele.io.channels.ChannelID;
+import eu.stratosphere.runtime.io.channels.ChannelID;
+import eu.stratosphere.runtime.io.gates.GateID;
+import eu.stratosphere.runtime.io.gates.InputGate;
+import eu.stratosphere.runtime.io.gates.OutputGate;
+import eu.stratosphere.nephele.jobgraph.JobGraph;
import eu.stratosphere.nephele.jobgraph.JobID;
import eu.stratosphere.nephele.protocols.AccumulatorProtocol;
import eu.stratosphere.nephele.services.iomanager.IOManager;
import eu.stratosphere.nephele.services.memorymanager.MemoryManager;
+import eu.stratosphere.runtime.io.network.bufferprovider.BufferProvider;
import eu.stratosphere.nephele.template.InputSplitProvider;
/**
* The user code of every Nephele task runs inside an <code>Environment</code> object. The environment provides
* important services to the task. It keeps track of setting up the communication channels and provides access to input
* splits, memory manager, etc.
- *
*/
public interface Environment {
/**
@@ -49,9 +44,9 @@ public interface Environment {
JobID getJobID();
/**
- * Returns the task configuration object which was attached to the original {@link JobVertex}.
+ * Returns the task configuration object which was attached to the original JobVertex.
*
- * @return the task configuration object which was attached to the original {@link JobVertex}
+ * @return the task configuration object which was attached to the original JobVertex.
*/
Configuration getTaskConfiguration();
@@ -128,13 +123,6 @@ public interface Environment {
GateID getNextUnboundInputGateID();
/**
- * Returns the next unbound output gate ID or <code>null</code> if no such ID exists
- *
- * @return the next unbound output gate ID or <code>null</code> if no such ID exists
- */
- GateID getNextUnboundOutputGateID();
-
- /**
* Returns the number of output gates registered with this environment.
*
* @return the number of output gates registered with this environment
@@ -163,46 +151,16 @@ public interface Environment {
int getNumberOfInputChannels();
/**
- * Creates an output gate.
- *
- * @param gateID
- * @param outputClass
- * @param selector
- * @param isBroadcast
- * @param <T>
- * The type of the record consumed by the output gate.
- * @return The created output gate.
+ * Creates a new OutputGate and registers it with the Environment.
+ *
+ * @return the newly created output gate
*/
- <T extends IOReadableWritable> OutputGate<T> createOutputGate(GateID gateID, Class<T> outputClass,
- ChannelSelector<T> selector, boolean isBroadcast);
+ OutputGate createAndRegisterOutputGate();
/**
- * Creates an input gate.
- *
- * @param gateID
- * @param deserializer
- * @param distributionPattern
- * @param <T>
- * The type of the record read from the input gate.
- * @return The created input gate.
+ * Creates a new InputGate and registers it with the Environment.
*/
- <T extends IOReadableWritable> InputGate<T> createInputGate(GateID gateID, RecordDeserializerFactory<T> deserializerFactory);
-
- /**
- * Registers an output gate with this environment.
- *
- * @param outputGate
- * the output gate to be registered
- */
- void registerOutputGate(OutputGate<? extends IOReadableWritable> outputGate);
-
- /**
- * Registers an input gate with this environment.
- *
- * @param inputGate
- * the input gate to be registered
- */
- void registerInputGate(InputGate<? extends IOReadableWritable> inputGate);
+ <T extends IOReadableWritable> InputGate<T> createAndRegisterInputGate();
/**
* Returns the IDs of all output channels connected to this environment.
@@ -255,5 +213,13 @@ public interface Environment {
*/
AccumulatorProtocol getAccumulatorProtocolProxy();
- Map<String, FutureTask<Path>> getCopyTask();
+ /**
+ * Returns the buffer provider for this environment.
+ * <p>
+ * The returned buffer provider is used by the output side of the network stack.
+ *
+ * @return Buffer provider for the output side of the network stack
+ * @see eu.stratosphere.runtime.io.api.RecordWriter
+ */
+ BufferProvider getOutputBufferProvider();
}
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/2db78a8d/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/execution/ExecutionStateTransition.java
----------------------------------------------------------------------
diff --git a/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/execution/ExecutionStateTransition.java b/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/execution/ExecutionStateTransition.java
index 7d7cebb..55f036a 100644
--- a/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/execution/ExecutionStateTransition.java
+++ b/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/execution/ExecutionStateTransition.java
@@ -97,29 +97,9 @@ public final class ExecutionStateTransition {
unexpectedStateChange = false;
}
- // This transition can appear if a task cannot be deployed at the assigned task manager.
- else if (oldState == ExecutionState.STARTING && newState == ExecutionState.FAILED) {
- unexpectedStateChange = false;
- }
-
// -------------- error cases --------------
- else if (newState == FAILED) {
- // any state may fail
- unexpectedStateChange = false;
- }
-
- // This is a regular transition as a result of a cancel operation.
- else if (oldState == ExecutionState.RUNNING && newState == ExecutionState.CANCELING) {
- unexpectedStateChange = false;
- }
-
- // This is a regular transition as a result of a cancel operation.
- else if (oldState == ExecutionState.FINISHING && newState == ExecutionState.CANCELING) {
- unexpectedStateChange = false;
- }
-
- // This is a regular transition as a result of a cancel operation.
- else if (oldState == ExecutionState.CANCELING && newState == ExecutionState.CANCELED) {
+ else if (newState == FAILED || newState == CANCELED || newState == CANCELING) {
+ // any state may fail or cancel itself
unexpectedStateChange = false;
}
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/2db78a8d/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/execution/RuntimeEnvironment.java
----------------------------------------------------------------------
diff --git a/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/execution/RuntimeEnvironment.java b/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/execution/RuntimeEnvironment.java
index a8ca11c..59787d2 100644
--- a/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/execution/RuntimeEnvironment.java
+++ b/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/execution/RuntimeEnvironment.java
@@ -36,21 +36,25 @@ import eu.stratosphere.nephele.deployment.ChannelDeploymentDescriptor;
import eu.stratosphere.nephele.deployment.GateDeploymentDescriptor;
import eu.stratosphere.nephele.deployment.TaskDeploymentDescriptor;
import eu.stratosphere.nephele.execution.librarycache.LibraryCacheManager;
-import eu.stratosphere.nephele.io.ChannelSelector;
-import eu.stratosphere.nephele.io.GateID;
-import eu.stratosphere.nephele.io.InputGate;
-import eu.stratosphere.nephele.io.OutputGate;
-import eu.stratosphere.nephele.io.RecordDeserializerFactory;
-import eu.stratosphere.nephele.io.RuntimeInputGate;
-import eu.stratosphere.nephele.io.RuntimeOutputGate;
-import eu.stratosphere.nephele.io.channels.ChannelID;
-import eu.stratosphere.nephele.io.channels.ChannelType;
+import eu.stratosphere.nephele.jobgraph.JobGraph;
import eu.stratosphere.nephele.jobgraph.JobID;
import eu.stratosphere.nephele.protocols.AccumulatorProtocol;
import eu.stratosphere.nephele.services.iomanager.IOManager;
import eu.stratosphere.nephele.services.memorymanager.MemoryManager;
import eu.stratosphere.nephele.template.AbstractInvokable;
import eu.stratosphere.nephele.template.InputSplitProvider;
+import eu.stratosphere.runtime.io.Buffer;
+import eu.stratosphere.runtime.io.channels.ChannelID;
+import eu.stratosphere.runtime.io.channels.ChannelType;
+import eu.stratosphere.runtime.io.channels.OutputChannel;
+import eu.stratosphere.runtime.io.gates.GateID;
+import eu.stratosphere.runtime.io.gates.InputGate;
+import eu.stratosphere.runtime.io.gates.OutputGate;
+import eu.stratosphere.runtime.io.network.bufferprovider.BufferAvailabilityListener;
+import eu.stratosphere.runtime.io.network.bufferprovider.BufferProvider;
+import eu.stratosphere.runtime.io.network.bufferprovider.GlobalBufferPool;
+import eu.stratosphere.runtime.io.network.bufferprovider.LocalBufferPool;
+import eu.stratosphere.runtime.io.network.bufferprovider.LocalBufferPoolOwner;
import eu.stratosphere.util.StringUtils;
/**
@@ -60,7 +64,7 @@ import eu.stratosphere.util.StringUtils;
* <p>
* This class is thread-safe.
*/
-public class RuntimeEnvironment implements Environment, Runnable {
+public class RuntimeEnvironment implements Environment, BufferProvider, LocalBufferPoolOwner, Runnable {
/**
* The log object used for debugging.
@@ -75,7 +79,7 @@ public class RuntimeEnvironment implements Environment, Runnable {
/**
* List of output gates created by the task.
*/
- private final List<OutputGate<? extends IOReadableWritable>> outputGates = new CopyOnWriteArrayList<OutputGate<? extends IOReadableWritable>>();
+ private final List<OutputGate> outputGates = new CopyOnWriteArrayList<OutputGate>();
/**
* List of input gates created by the task.
@@ -83,12 +87,6 @@ public class RuntimeEnvironment implements Environment, Runnable {
private final List<InputGate<? extends IOReadableWritable>> inputGates = new CopyOnWriteArrayList<InputGate<? extends IOReadableWritable>>();
/**
- * Queue of unbound output gate IDs which are required for deserializing an environment in the course of an RPC
- * call.
- */
- private final Queue<GateID> unboundOutputGateIDs = new ArrayDeque<GateID>();
-
- /**
* Queue of unbound input gate IDs which are required for deserializing an environment in the course of an RPC
* call.
*/
@@ -145,7 +143,7 @@ public class RuntimeEnvironment implements Environment, Runnable {
private volatile ExecutionObserver executionObserver = null;
/**
- * The RPC procy to report accumulators to JobManager
+ * The RPC proxy to report accumulators to JobManager
*/
private AccumulatorProtocol accumulatorProtocolProxy = null;
@@ -164,7 +162,7 @@ public class RuntimeEnvironment implements Environment, Runnable {
*/
private final String taskName;
- private Map<String,FutureTask<Path>> cacheCopyTasks = new HashMap<String, FutureTask<Path>>();
+ private LocalBufferPool outputBufferPool;
/**
* Creates a new runtime environment object which contains the runtime information for the encapsulated Nephele
@@ -177,7 +175,7 @@ public class RuntimeEnvironment implements Environment, Runnable {
* @param invokableClass
* invokableClass the class that should be instantiated as a Nephele task
* @param taskConfiguration
- * the configuration object which was attached to the original {@link JobVertex}
+ * the configuration object which was attached to the original JobVertex
* @param jobConfiguration
* the configuration object which was attached to the original {@link JobGraph}
* @throws Exception
@@ -240,60 +238,16 @@ public class RuntimeEnvironment implements Environment, Runnable {
this.invokable.setEnvironment(this);
this.invokable.registerInputOutput();
- if (!this.unboundOutputGateIDs.isEmpty() && LOG.isErrorEnabled()) {
- LOG.error("Inconsistency: " + this.unboundOutputGateIDs.size() + " unbound output gate IDs left");
- }
+ int numOutputGates = tdd.getNumberOfOutputGateDescriptors();
- if (!this.unboundInputGateIDs.isEmpty() && LOG.isErrorEnabled()) {
- LOG.error("Inconsistency: " + this.unboundInputGateIDs.size() + " unbound output gate IDs left");
+ for (int i = 0; i < numOutputGates; ++i) {
+ this.outputGates.get(i).initializeChannels(tdd.getOutputGateDescriptor(i));
}
- final int noogdd = tdd.getNumberOfOutputGateDescriptors();
- for (int i = 0; i < noogdd; ++i) {
- final GateDeploymentDescriptor gdd = tdd.getOutputGateDescriptor(i);
- final OutputGate og = this.outputGates.get(i);
- final ChannelType channelType = gdd.getChannelType();
- og.setChannelType(channelType);
+ int numInputGates = tdd.getNumberOfInputGateDescriptors();
- final int nocdd = gdd.getNumberOfChannelDescriptors();
- for (int j = 0; j < nocdd; ++j) {
-
- final ChannelDeploymentDescriptor cdd = gdd.getChannelDescriptor(j);
- switch (channelType) {
- case NETWORK:
- og.createNetworkOutputChannel(og, cdd.getOutputChannelID(), cdd.getInputChannelID());
- break;
- case INMEMORY:
- og.createInMemoryOutputChannel(og, cdd.getOutputChannelID(), cdd.getInputChannelID());
- break;
- default:
- throw new IllegalStateException("Unknown channel type");
- }
- }
- }
-
- final int noigdd = tdd.getNumberOfInputGateDescriptors();
- for (int i = 0; i < noigdd; ++i) {
- final GateDeploymentDescriptor gdd = tdd.getInputGateDescriptor(i);
- final InputGate ig = this.inputGates.get(i);
- final ChannelType channelType = gdd.getChannelType();
- ig.setChannelType(channelType);
-
- final int nicdd = gdd.getNumberOfChannelDescriptors();
- for (int j = 0; j < nicdd; ++j) {
-
- final ChannelDeploymentDescriptor cdd = gdd.getChannelDescriptor(j);
- switch (channelType) {
- case NETWORK:
- ig.createNetworkInputChannel(ig, cdd.getInputChannelID(), cdd.getOutputChannelID());
- break;
- case INMEMORY:
- ig.createInMemoryInputChannel(ig, cdd.getInputChannelID(), cdd.getOutputChannelID());
- break;
- default:
- throw new IllegalStateException("Unknown channel type");
- }
- }
+ for(int i = 0; i < numInputGates; i++){
+ this.inputGates.get(i).initializeChannels(tdd.getInputGateDescriptor(i));
}
}
@@ -306,29 +260,26 @@ public class RuntimeEnvironment implements Environment, Runnable {
return this.invokable;
}
-
@Override
public JobID getJobID() {
return this.jobID;
}
-
@Override
public GateID getNextUnboundInputGateID() {
-
return this.unboundInputGateIDs.poll();
}
+ @Override
+ public OutputGate createAndRegisterOutputGate() {
+ OutputGate gate = new OutputGate(getJobID(), new GateID(), getNumberOfOutputGates());
+ this.outputGates.add(gate);
- public GateID getNextUnboundOutputGateID() {
-
- return this.unboundOutputGateIDs.poll();
+ return gate;
}
-
@Override
public void run() {
-
if (invokable == null) {
LOG.fatal("ExecutionEnvironment has no Invokable set");
}
@@ -343,9 +294,6 @@ public class RuntimeEnvironment implements Environment, Runnable {
}
try {
-
- // Activate input channels
- // activateInputChannels();
ClassLoader cl = LibraryCacheManager.getClassLoader(jobID);
Thread.currentThread().setContextClassLoader(cl);
this.invokable.invoke();
@@ -354,9 +302,7 @@ public class RuntimeEnvironment implements Environment, Runnable {
if (this.executionObserver.isCanceled()) {
throw new InterruptedException();
}
-
} catch (Throwable t) {
-
if (!this.executionObserver.isCanceled()) {
// Perform clean up when the task failed and has been not canceled by the user
@@ -370,7 +316,7 @@ public class RuntimeEnvironment implements Environment, Runnable {
// Release all resources that may currently be allocated by the individual channels
releaseAllChannelResources();
- if (this.executionObserver.isCanceled()) {
+ if (this.executionObserver.isCanceled() || t instanceof CancelTaskException) {
changeExecutionState(ExecutionState.CANCELED, null);
} else {
changeExecutionState(ExecutionState.FAILED, StringUtils.stringifyException(t));
@@ -399,7 +345,7 @@ public class RuntimeEnvironment implements Environment, Runnable {
// Release all resources that may currently be allocated by the individual channels
releaseAllChannelResources();
- if (this.executionObserver.isCanceled()) {
+ if (this.executionObserver.isCanceled() || t instanceof CancelTaskException) {
changeExecutionState(ExecutionState.CANCELED, null);
} else {
changeExecutionState(ExecutionState.FAILED, StringUtils.stringifyException(t));
@@ -415,62 +361,35 @@ public class RuntimeEnvironment implements Environment, Runnable {
changeExecutionState(ExecutionState.FINISHED, null);
}
-
@Override
- public <T extends IOReadableWritable> OutputGate<T> createOutputGate(final GateID gateID, Class<T> outputClass,
- final ChannelSelector<T> selector, final boolean isBroadcast) {
- final RuntimeOutputGate<T> rog = new RuntimeOutputGate<T>(getJobID(), gateID, outputClass,
- getNumberOfOutputGates(), selector, isBroadcast);
- return rog;
- }
+ public <T extends IOReadableWritable> InputGate<T> createAndRegisterInputGate() {
+ InputGate<T> gate = new InputGate<T>(getJobID(), new GateID(), getNumberOfInputGates());
+ this.inputGates.add(gate);
-
- @Override
- public <T extends IOReadableWritable> InputGate<T> createInputGate(final GateID gateID,
- final RecordDeserializerFactory<T> deserializerFactory) {
- final RuntimeInputGate<T> rig = new RuntimeInputGate<T>(getJobID(), gateID, deserializerFactory,
- getNumberOfInputGates());
- return rig;
- }
-
- @Override
- public void registerOutputGate(OutputGate<? extends IOReadableWritable> outputGate) {
-
- this.outputGates.add(outputGate);
+ return gate;
}
- @Override
- public void registerInputGate(InputGate<? extends IOReadableWritable> inputGate) {
- this.inputGates.add(inputGate);
- }
-
-
public int getNumberOfOutputGates() {
return this.outputGates.size();
}
-
@Override
public int getNumberOfInputGates() {
return this.inputGates.size();
}
-
@Override
public int getNumberOfOutputChannels() {
-
int numberOfOutputChannels = 0;
for (int i = 0; i < this.outputGates.size(); ++i) {
- numberOfOutputChannels += this.outputGates.get(i).getNumberOfOutputChannels();
+ numberOfOutputChannels += this.outputGates.get(i).getNumChannels();
}
return numberOfOutputChannels;
}
-
@Override
public int getNumberOfInputChannels() {
-
int numberOfInputChannels = 0;
for (int i = 0; i < this.inputGates.size(); ++i) {
numberOfInputChannels += this.inputGates.get(i).getNumberOfInputChannels();
@@ -497,13 +416,13 @@ public class RuntimeEnvironment implements Environment, Runnable {
/**
* Returns the registered output gate with index <code>pos</code>.
*
- * @param pos
+ * @param index
* the index of the output gate to return
* @return the output gate at index <code>pos</code> or <code>null</code> if no such index exists
*/
- public OutputGate<? extends IOReadableWritable> getOutputGate(final int pos) {
- if (pos < this.outputGates.size()) {
- return this.outputGates.get(pos);
+ public OutputGate getOutputGate(int index) {
+ if (index < this.outputGates.size()) {
+ return this.outputGates.get(index);
}
return null;
@@ -515,7 +434,6 @@ public class RuntimeEnvironment implements Environment, Runnable {
* @return the thread which is assigned to execute the user code
*/
public Thread getExecutingThread() {
-
synchronized (this) {
if (this.executingThread == null) {
@@ -538,29 +456,14 @@ public class RuntimeEnvironment implements Environment, Runnable {
* @throws InterruptedException
* thrown if the thread waiting for the channels to be closed is interrupted
*/
- private void waitForOutputChannelsToBeClosed() throws IOException, InterruptedException {
-
- // Wait for disconnection of all output gates
- while (true) {
-
- // Make sure, we leave this method with an InterruptedException when the task has been canceled
- if (this.executionObserver.isCanceled()) {
- throw new InterruptedException();
- }
-
- boolean allClosed = true;
- for (int i = 0; i < getNumberOfOutputGates(); i++) {
- final OutputGate<? extends IOReadableWritable> og = this.outputGates.get(i);
- if (!og.isClosed()) {
- allClosed = false;
- }
- }
+ private void waitForOutputChannelsToBeClosed() throws InterruptedException {
+ // Make sure, we leave this method with an InterruptedException when the task has been canceled
+ if (this.executionObserver.isCanceled()) {
+ return;
+ }
- if (allClosed) {
- break;
- } else {
- Thread.sleep(SLEEPINTERVAL);
- }
+ for (OutputGate og : this.outputGates) {
+ og.waitForGateToBeClosed();
}
}
@@ -573,7 +476,6 @@ public class RuntimeEnvironment implements Environment, Runnable {
* thrown if the thread waiting for the channels to be closed is interrupted
*/
private void waitForInputChannelsToBeClosed() throws IOException, InterruptedException {
-
// Wait for disconnection of all output gates
while (true) {
@@ -602,7 +504,6 @@ public class RuntimeEnvironment implements Environment, Runnable {
* Closes all input gates which are not already closed.
*/
private void closeInputGates() throws IOException, InterruptedException {
-
for (int i = 0; i < this.inputGates.size(); i++) {
final InputGate<? extends IOReadableWritable> eig = this.inputGates.get(i);
// Important: close must be called on each input gate exactly once
@@ -615,61 +516,49 @@ public class RuntimeEnvironment implements Environment, Runnable {
* Requests all output gates to be closed.
*/
private void requestAllOutputGatesToClose() throws IOException, InterruptedException {
-
for (int i = 0; i < this.outputGates.size(); i++) {
this.outputGates.get(i).requestClose();
}
}
-
@Override
public IOManager getIOManager() {
return this.ioManager;
}
-
@Override
public MemoryManager getMemoryManager() {
return this.memoryManager;
}
-
@Override
public Configuration getTaskConfiguration() {
return this.taskConfiguration;
}
-
@Override
public Configuration getJobConfiguration() {
return this.jobConfiguration;
}
-
@Override
public int getCurrentNumberOfSubtasks() {
-
return this.currentNumberOfSubtasks;
}
-
@Override
public int getIndexInSubtaskGroup() {
-
return this.indexInSubtaskGroup;
}
private void changeExecutionState(final ExecutionState newExecutionState, final String optionalMessage) {
-
if (this.executionObserver != null) {
this.executionObserver.executionStateChanged(newExecutionState, optionalMessage);
}
}
-
@Override
public String getTaskName() {
-
return this.taskName;
}
@@ -679,9 +568,7 @@ public class RuntimeEnvironment implements Environment, Runnable {
* @return the name of the task with its index in the subtask group and the total number of subtasks
*/
public String getTaskNameWithIndex() {
-
- return this.taskName + " (" + (getIndexInSubtaskGroup() + 1) + "/"
- + getCurrentNumberOfSubtasks() + ")";
+ return String.format("%s (%d/%d)", this.taskName, getIndexInSubtaskGroup() + 1, getCurrentNumberOfSubtasks());
}
/**
@@ -694,25 +581,20 @@ public class RuntimeEnvironment implements Environment, Runnable {
this.executionObserver = executionObserver;
}
-
@Override
public InputSplitProvider getInputSplitProvider() {
return this.inputSplitProvider;
}
-
@Override
public void userThreadStarted(final Thread userThread) {
-
if (this.executionObserver != null) {
this.executionObserver.userThreadStarted(userThread);
}
}
-
@Override
public void userThreadFinished(final Thread userThread) {
-
if (this.executionObserver != null) {
this.executionObserver.userThreadFinished(userThread);
}
@@ -723,7 +605,6 @@ public class RuntimeEnvironment implements Environment, Runnable {
* method should only be called after the respected task has stopped running.
*/
private void releaseAllChannelResources() {
-
for (int i = 0; i < this.inputGates.size(); i++) {
this.inputGates.get(i).releaseAllChannelResources();
}
@@ -733,28 +614,21 @@ public class RuntimeEnvironment implements Environment, Runnable {
}
}
-
@Override
public Set<ChannelID> getOutputChannelIDs() {
+ Set<ChannelID> ids= new HashSet<ChannelID>();
- final Set<ChannelID> outputChannelIDs = new HashSet<ChannelID>();
-
- final Iterator<OutputGate<? extends IOReadableWritable>> gateIterator = this.outputGates.iterator();
- while (gateIterator.hasNext()) {
-
- final OutputGate<? extends IOReadableWritable> outputGate = gateIterator.next();
- for (int i = 0; i < outputGate.getNumberOfOutputChannels(); ++i) {
- outputChannelIDs.add(outputGate.getOutputChannel(i).getID());
+ for (OutputGate gate : this.outputGates) {
+ for (OutputChannel channel : gate.channels()) {
+ ids.add(channel.getID());
}
}
- return Collections.unmodifiableSet(outputChannelIDs);
+ return Collections.unmodifiableSet(ids);
}
-
@Override
public Set<ChannelID> getInputChannelIDs() {
-
final Set<ChannelID> inputChannelIDs = new HashSet<ChannelID>();
final Iterator<InputGate<? extends IOReadableWritable>> gateIterator = this.inputGates.iterator();
@@ -769,10 +643,8 @@ public class RuntimeEnvironment implements Environment, Runnable {
return Collections.unmodifiableSet(inputChannelIDs);
}
-
@Override
public Set<GateID> getInputGateIDs() {
-
final Set<GateID> inputGateIDs = new HashSet<GateID>();
final Iterator<InputGate<? extends IOReadableWritable>> gateIterator = this.inputGates.iterator();
@@ -783,13 +655,11 @@ public class RuntimeEnvironment implements Environment, Runnable {
return Collections.unmodifiableSet(inputGateIDs);
}
-
@Override
public Set<GateID> getOutputGateIDs() {
-
final Set<GateID> outputGateIDs = new HashSet<GateID>();
- final Iterator<OutputGate<? extends IOReadableWritable>> gateIterator = this.outputGates.iterator();
+ final Iterator<OutputGate> gateIterator = this.outputGates.iterator();
while (gateIterator.hasNext()) {
outputGateIDs.add(gateIterator.next().getGateID());
}
@@ -800,11 +670,10 @@ public class RuntimeEnvironment implements Environment, Runnable {
@Override
public Set<ChannelID> getOutputChannelIDsOfGate(final GateID gateID) {
-
- OutputGate<? extends IOReadableWritable> outputGate = null;
- final Iterator<OutputGate<? extends IOReadableWritable>> gateIterator = this.outputGates.iterator();
+ OutputGate outputGate = null;
+ final Iterator<OutputGate> gateIterator = this.outputGates.iterator();
while (gateIterator.hasNext()) {
- final OutputGate<? extends IOReadableWritable> candidateGate = gateIterator.next();
+ final OutputGate candidateGate = gateIterator.next();
if (candidateGate.getGateID().equals(gateID)) {
outputGate = candidateGate;
break;
@@ -817,8 +686,8 @@ public class RuntimeEnvironment implements Environment, Runnable {
final Set<ChannelID> outputChannelIDs = new HashSet<ChannelID>();
- for (int i = 0; i < outputGate.getNumberOfOutputChannels(); ++i) {
- outputChannelIDs.add(outputGate.getOutputChannel(i).getID());
+ for (int i = 0; i < outputGate.getNumChannels(); ++i) {
+ outputChannelIDs.add(outputGate.getChannel(i).getID());
}
return Collections.unmodifiableSet(outputChannelIDs);
@@ -827,7 +696,6 @@ public class RuntimeEnvironment implements Environment, Runnable {
@Override
public Set<ChannelID> getInputChannelIDsOfGate(final GateID gateID) {
-
InputGate<? extends IOReadableWritable> inputGate = null;
final Iterator<InputGate<? extends IOReadableWritable>> gateIterator = this.inputGates.iterator();
while (gateIterator.hasNext()) {
@@ -850,18 +718,86 @@ public class RuntimeEnvironment implements Environment, Runnable {
return Collections.unmodifiableSet(inputChannelIDs);
}
+
+ public List<OutputGate> outputGates() {
+ return this.outputGates;
+ }
+
+ public List<InputGate<? extends IOReadableWritable>> inputGates() {
+ return this.inputGates;
+ }
@Override
public AccumulatorProtocol getAccumulatorProtocolProxy() {
return accumulatorProtocolProxy;
}
- public void addCopyTaskForCacheFile(String name, FutureTask<Path> copyTask) {
- this.cacheCopyTasks.put(name, copyTask);
+ @Override
+ public BufferProvider getOutputBufferProvider() {
+ return this;
}
+
+ // -----------------------------------------------------------------------------------------------------------------
+ // BufferProvider methods
+ // -----------------------------------------------------------------------------------------------------------------
+
@Override
- public Map<String, FutureTask<Path>> getCopyTask() {
- return this.cacheCopyTasks;
+ public Buffer requestBuffer(int minBufferSize) throws IOException {
+ return this.outputBufferPool.requestBuffer(minBufferSize);
}
+
+ @Override
+ public Buffer requestBufferBlocking(int minBufferSize) throws IOException, InterruptedException {
+ return this.outputBufferPool.requestBufferBlocking(minBufferSize);
+ }
+
+ @Override
+ public int getBufferSize() {
+ return this.outputBufferPool.getBufferSize();
+ }
+
+ @Override
+ public boolean registerBufferAvailabilityListener(BufferAvailabilityListener listener) {
+ return this.outputBufferPool.registerBufferAvailabilityListener(listener);
+ }
+
+ // -----------------------------------------------------------------------------------------------------------------
+ // LocalBufferPoolOwner methods
+ // -----------------------------------------------------------------------------------------------------------------
+ @Override
+ public int getNumberOfChannels() {
+ return getNumberOfOutputChannels();
+ }
+
+ @Override
+ public void setDesignatedNumberOfBuffers(int numBuffers) {
+ this.outputBufferPool.setNumDesignatedBuffers(numBuffers);
+ }
+
+ @Override
+ public void clearLocalBufferPool() {
+ this.outputBufferPool.destroy();
+ }
+
+ @Override
+ public void registerGlobalBufferPool(GlobalBufferPool globalBufferPool) {
+ if (this.outputBufferPool == null) {
+ this.outputBufferPool = new LocalBufferPool(globalBufferPool, 1);
+ }
+ }
+
+ @Override
+ public void logBufferUtilization() {
+ LOG.info(String.format("\t%s: %d available, %d requested, %d designated",
+ getTaskNameWithIndex(),
+ this.outputBufferPool.numAvailableBuffers(),
+ this.outputBufferPool.numRequestedBuffers(),
+ this.outputBufferPool.numDesignatedBuffers()));
+ }
+
+ @Override
+ public void reportAsynchronousEvent() {
+ this.outputBufferPool.reportAsynchronousEvent();
+ }
}
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/2db78a8d/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/executiongraph/DistributionPatternProvider.java
----------------------------------------------------------------------
diff --git a/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/executiongraph/DistributionPatternProvider.java b/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/executiongraph/DistributionPatternProvider.java
index 0c004ad..35350cc 100644
--- a/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/executiongraph/DistributionPatternProvider.java
+++ b/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/executiongraph/DistributionPatternProvider.java
@@ -13,7 +13,7 @@
package eu.stratosphere.nephele.executiongraph;
-import eu.stratosphere.nephele.io.DistributionPattern;
+import eu.stratosphere.nephele.jobgraph.DistributionPattern;
public final class DistributionPatternProvider {
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/2db78a8d/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/executiongraph/ExecutionEdge.java
----------------------------------------------------------------------
diff --git a/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/executiongraph/ExecutionEdge.java b/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/executiongraph/ExecutionEdge.java
index cbb957f..0106361 100644
--- a/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/executiongraph/ExecutionEdge.java
+++ b/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/executiongraph/ExecutionEdge.java
@@ -13,11 +13,11 @@
package eu.stratosphere.nephele.executiongraph;
-import eu.stratosphere.nephele.io.channels.ChannelID;
-import eu.stratosphere.nephele.io.channels.ChannelType;
+import eu.stratosphere.runtime.io.channels.ChannelID;
+import eu.stratosphere.runtime.io.channels.ChannelType;
/**
- * Objects of this class represent a pair of {@link AbstractInputChannel} and {@link AbstractOutputChannel} objects
+ * Objects of this class represent a pair of {@link eu.stratosphere.runtime.io.serialization.io.channels.InputChannel} and {@link AbstractOutputChannel} objects
* within an {@link ExecutionGraph}, Nephele's internal scheduling representation for jobs.
*
*/
@@ -85,11 +85,6 @@ public final class ExecutionEdge {
return this.groupEdge.getChannelType();
}
- public boolean isBroadcast() {
-
- return this.groupEdge.isBroadcast();
- }
-
public int getConnectionID() {
return this.groupEdge.getConnectionID();
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/2db78a8d/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/executiongraph/ExecutionGate.java
----------------------------------------------------------------------
diff --git a/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/executiongraph/ExecutionGate.java b/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/executiongraph/ExecutionGate.java
index f1bcb76..20d1a4e 100644
--- a/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/executiongraph/ExecutionGate.java
+++ b/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/executiongraph/ExecutionGate.java
@@ -16,8 +16,8 @@ package eu.stratosphere.nephele.executiongraph;
import java.util.Collection;
import java.util.concurrent.CopyOnWriteArrayList;
-import eu.stratosphere.nephele.io.GateID;
-import eu.stratosphere.nephele.io.channels.ChannelType;
+import eu.stratosphere.runtime.io.gates.GateID;
+import eu.stratosphere.runtime.io.channels.ChannelType;
/**
* Objects of this class represent either an {@link InputGate} or {@link OutputGate} within an {@link ExecutionGraph},
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/2db78a8d/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/executiongraph/ExecutionGraph.java
----------------------------------------------------------------------
diff --git a/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/executiongraph/ExecutionGraph.java b/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/executiongraph/ExecutionGraph.java
index aa08655..ca7eddb 100644
--- a/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/executiongraph/ExecutionGraph.java
+++ b/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/executiongraph/ExecutionGraph.java
@@ -40,17 +40,17 @@ import eu.stratosphere.nephele.instance.AllocatedResource;
import eu.stratosphere.nephele.instance.DummyInstance;
import eu.stratosphere.nephele.instance.InstanceManager;
import eu.stratosphere.nephele.instance.InstanceType;
-import eu.stratosphere.nephele.io.DistributionPattern;
-import eu.stratosphere.nephele.io.GateID;
-import eu.stratosphere.nephele.io.channels.ChannelID;
-import eu.stratosphere.nephele.io.channels.ChannelType;
+import eu.stratosphere.nephele.jobgraph.DistributionPattern;
+import eu.stratosphere.runtime.io.gates.GateID;
+import eu.stratosphere.runtime.io.channels.ChannelID;
+import eu.stratosphere.runtime.io.channels.ChannelType;
import eu.stratosphere.nephele.jobgraph.AbstractJobInputVertex;
import eu.stratosphere.nephele.jobgraph.AbstractJobVertex;
import eu.stratosphere.nephele.jobgraph.JobEdge;
import eu.stratosphere.nephele.jobgraph.JobFileOutputVertex;
import eu.stratosphere.nephele.jobgraph.JobGraph;
import eu.stratosphere.nephele.jobgraph.JobID;
-import eu.stratosphere.nephele.taskmanager.runtime.ExecutorThreadFactory;
+import eu.stratosphere.nephele.taskmanager.ExecutorThreadFactory;
import eu.stratosphere.nephele.template.AbstractInputTask;
import eu.stratosphere.nephele.template.AbstractInvokable;
import eu.stratosphere.util.StringUtils;
@@ -412,9 +412,6 @@ public class ExecutionGraph implements ExecutionListener {
// First, build the group edges
for (int i = 0; i < sjv.getNumberOfForwardConnections(); ++i) {
-
- final boolean isBroadcast = sgv.getEnvironment().getOutputGate(i).isBroadcast();
-
final JobEdge edge = sjv.getForwardConnection(i);
final AbstractJobVertex tjv = edge.getConnectedVertex();
@@ -427,13 +424,12 @@ public class ExecutionGraph implements ExecutionListener {
userDefinedChannelType = false;
channelType = ChannelType.NETWORK;
}
- // Use NO_COMPRESSION as default compression level if nothing else is defined by the user
final DistributionPattern distributionPattern = edge.getDistributionPattern();
// Connect the corresponding group vertices and copy the user settings from the job edge
final ExecutionGroupEdge groupEdge = sgv.wireTo(tgv, edge.getIndexOfInputGate(), i, channelType,
- userDefinedChannelType,distributionPattern, isBroadcast);
+ userDefinedChannelType,distributionPattern);
final ExecutionGate outputGate = new ExecutionGate(new GateID(), sev, groupEdge, false);
sev.insertOutputGate(i, outputGate);
@@ -970,7 +966,7 @@ public class ExecutionGraph implements ExecutionListener {
final ExecutionGate outputGate = sourceVertex.getOutputGate(i);
final ChannelType channelType = outputGate.getChannelType();
- if (channelType == ChannelType.INMEMORY) {
+ if (channelType == ChannelType.IN_MEMORY) {
final int numberOfOutputChannels = outputGate.getNumberOfEdges();
for (int j = 0; j < numberOfOutputChannels; ++j) {
final ExecutionEdge outputChannel = outputGate.getEdge(j);
@@ -990,7 +986,7 @@ public class ExecutionGraph implements ExecutionListener {
final ExecutionGate inputGate = targetVertex.getInputGate(i);
final ChannelType channelType = inputGate.getChannelType();
- if (channelType == ChannelType.INMEMORY) {
+ if (channelType == ChannelType.IN_MEMORY) {
final int numberOfInputChannels = inputGate.getNumberOfEdges();
for (int j = 0; j < numberOfInputChannels; ++j) {
final ExecutionEdge inputChannel = inputGate.getEdge(j);
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/2db78a8d/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/executiongraph/ExecutionGroupEdge.java
----------------------------------------------------------------------
diff --git a/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/executiongraph/ExecutionGroupEdge.java b/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/executiongraph/ExecutionGroupEdge.java
index 14970fd..373d120 100644
--- a/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/executiongraph/ExecutionGroupEdge.java
+++ b/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/executiongraph/ExecutionGroupEdge.java
@@ -13,8 +13,8 @@
package eu.stratosphere.nephele.executiongraph;
-import eu.stratosphere.nephele.io.DistributionPattern;
-import eu.stratosphere.nephele.io.channels.ChannelType;
+import eu.stratosphere.runtime.io.channels.ChannelType;
+import eu.stratosphere.nephele.jobgraph.DistributionPattern;
/**
* An execution group edge represents an edge between two execution group vertices.
@@ -66,11 +66,6 @@ public class ExecutionGroupEdge {
private final DistributionPattern distributionPattern;
/**
- * Stores if the edge is part of a broadcast group.
- */
- private final boolean isBroadcast;
-
- /**
* Constructs a new group edge.
*
* @param sourceVertex
@@ -85,19 +80,12 @@ public class ExecutionGroupEdge {
* the channel type for the edge
* @param userDefinedChannelType
* <code>true</code> if the channel type has been specified by the user, <code>false</code> otherwise
- * @param compressionLevel
- * the compression level for the edge
- * @param userDefinedCompressionLevel
- * <code>true</code> if the compression level has been specified by the user, <code>false</code> otherwise
* @param distributionPattern
* the distribution pattern to create the wiring
- * @param isBroadcast
- * indicates that the edge is part of a broadcast group
*/
public ExecutionGroupEdge(final ExecutionGroupVertex sourceVertex, final int indexOfOutputGate,
final ExecutionGroupVertex targetVertex, final int indexOfInputGate, final ChannelType channelType,
- final boolean userDefinedChannelType, final DistributionPattern distributionPattern,
- final boolean isBroadcast) {
+ final boolean userDefinedChannelType, final DistributionPattern distributionPattern) {
this.sourceVertex = sourceVertex;
this.indexOfOutputGate = indexOfOutputGate;
this.channelType = channelType;
@@ -105,7 +93,6 @@ public class ExecutionGroupEdge {
this.userDefinedChannelType = userDefinedChannelType;
this.targetVertex = targetVertex;
this.distributionPattern = distributionPattern;
- this.isBroadcast = isBroadcast;
}
/**
@@ -206,13 +193,4 @@ public class ExecutionGroupEdge {
public DistributionPattern getDistributionPattern() {
return this.distributionPattern;
}
-
- /**
- * Checks if the edge is part of a broadcast group.
- *
- * @return <code>true</code> if the edge is part of a broadcast group, <code>false</code> otherwise
- */
- public boolean isBroadcast() {
- return this.isBroadcast;
- }
}
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/2db78a8d/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/executiongraph/ExecutionGroupVertex.java
----------------------------------------------------------------------
diff --git a/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/executiongraph/ExecutionGroupVertex.java b/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/executiongraph/ExecutionGroupVertex.java
index 908eb6e..89b4b6d 100644
--- a/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/executiongraph/ExecutionGroupVertex.java
+++ b/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/executiongraph/ExecutionGroupVertex.java
@@ -13,6 +13,17 @@
package eu.stratosphere.nephele.executiongraph;
+import eu.stratosphere.configuration.Configuration;
+import eu.stratosphere.core.io.InputSplit;
+import eu.stratosphere.nephele.execution.RuntimeEnvironment;
+import eu.stratosphere.nephele.instance.AllocatedResource;
+import eu.stratosphere.nephele.instance.DummyInstance;
+import eu.stratosphere.nephele.instance.InstanceType;
+import eu.stratosphere.nephele.jobgraph.JobVertexID;
+import eu.stratosphere.nephele.template.AbstractInvokable;
+import eu.stratosphere.runtime.io.channels.ChannelType;
+import eu.stratosphere.nephele.jobgraph.DistributionPattern;
+
import java.util.ArrayList;
import java.util.HashMap;
import java.util.Iterator;
@@ -23,17 +34,6 @@ import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicReference;
-import eu.stratosphere.configuration.Configuration;
-import eu.stratosphere.core.io.InputSplit;
-import eu.stratosphere.nephele.execution.RuntimeEnvironment;
-import eu.stratosphere.nephele.instance.AllocatedResource;
-import eu.stratosphere.nephele.instance.DummyInstance;
-import eu.stratosphere.nephele.instance.InstanceType;
-import eu.stratosphere.nephele.io.DistributionPattern;
-import eu.stratosphere.nephele.io.channels.ChannelType;
-import eu.stratosphere.nephele.jobgraph.JobVertexID;
-import eu.stratosphere.nephele.template.AbstractInvokable;
-
/**
* An ExecutionGroupVertex is created for every JobVertex of the initial job graph. It represents a number of execution
* vertices that originate from the same job vertex.
@@ -388,7 +388,7 @@ public final class ExecutionGroupVertex {
*/
ExecutionGroupEdge wireTo(final ExecutionGroupVertex groupVertex, final int indexOfInputGate,
final int indexOfOutputGate, final ChannelType channelType, final boolean userDefinedChannelType,
- final DistributionPattern distributionPattern, final boolean isBroadcast) throws GraphConversionException {
+ final DistributionPattern distributionPattern) throws GraphConversionException {
try {
final ExecutionGroupEdge previousEdge = this.forwardLinks.get(indexOfOutputGate);
@@ -401,8 +401,7 @@ public final class ExecutionGroupVertex {
}
final ExecutionGroupEdge edge = new ExecutionGroupEdge(this, indexOfOutputGate, groupVertex, indexOfInputGate,
- channelType, userDefinedChannelType, distributionPattern,
- isBroadcast);
+ channelType, userDefinedChannelType, distributionPattern);
this.forwardLinks.add(edge);
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/2db78a8d/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/executiongraph/ExecutionStage.java
----------------------------------------------------------------------
diff --git a/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/executiongraph/ExecutionStage.java b/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/executiongraph/ExecutionStage.java
index a251d3c..eab2375 100644
--- a/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/executiongraph/ExecutionStage.java
+++ b/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/executiongraph/ExecutionStage.java
@@ -27,7 +27,7 @@ import eu.stratosphere.nephele.instance.AbstractInstance;
import eu.stratosphere.nephele.instance.DummyInstance;
import eu.stratosphere.nephele.instance.InstanceRequestMap;
import eu.stratosphere.nephele.instance.InstanceType;
-import eu.stratosphere.nephele.io.channels.ChannelType;
+import eu.stratosphere.runtime.io.channels.ChannelType;
/**
* An execution stage contains all execution group vertices (and as a result all execution vertices) which
@@ -401,7 +401,7 @@ public final class ExecutionStage {
recurse = true;
}
- if (channelType == ChannelType.INMEMORY
+ if (channelType == ChannelType.IN_MEMORY
&& !pipeline.equals(connectedVertex.getExecutionPipeline())) {
connectedVertex.setExecutionPipeline(pipeline);
@@ -432,7 +432,7 @@ public final class ExecutionStage {
recurse = true;
}
- if (channelType == ChannelType.INMEMORY
+ if (channelType == ChannelType.IN_MEMORY
&& !pipeline.equals(connectedVertex.getExecutionPipeline())) {
connectedVertex.setExecutionPipeline(pipeline);
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/2db78a8d/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/executiongraph/ExecutionVertex.java
----------------------------------------------------------------------
diff --git a/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/executiongraph/ExecutionVertex.java b/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/executiongraph/ExecutionVertex.java
index 7a9c48b..57ff073 100644
--- a/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/executiongraph/ExecutionVertex.java
+++ b/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/executiongraph/ExecutionVertex.java
@@ -35,11 +35,10 @@ import eu.stratosphere.nephele.execution.ExecutionState;
import eu.stratosphere.nephele.execution.ExecutionStateTransition;
import eu.stratosphere.nephele.instance.AllocatedResource;
import eu.stratosphere.nephele.instance.AllocationID;
-import eu.stratosphere.nephele.io.GateID;
+import eu.stratosphere.runtime.io.gates.GateID;
import eu.stratosphere.nephele.taskmanager.AbstractTaskResult;
import eu.stratosphere.nephele.taskmanager.AbstractTaskResult.ReturnCode;
import eu.stratosphere.nephele.taskmanager.TaskCancelResult;
-import eu.stratosphere.nephele.taskmanager.TaskKillResult;
import eu.stratosphere.nephele.taskmanager.TaskSubmissionResult;
import eu.stratosphere.nephele.util.AtomicEnum;
import eu.stratosphere.nephele.util.SerializableArrayList;
@@ -692,41 +691,6 @@ public final class ExecutionVertex {
}
/**
- * Kills and removes the task represented by this vertex from the instance it is currently running on. If the
- * corresponding task is not in the state <code>RUNNING</code>, this call will be ignored. If the call has been
- * executed
- * successfully, the task will change the state <code>FAILED</code>.
- *
- * @return the result of the task kill attempt
- */
- public TaskKillResult killTask() {
-
- final ExecutionState state = this.executionState.get();
-
- if (state != ExecutionState.RUNNING) {
- final TaskKillResult result = new TaskKillResult(getID(), AbstractTaskResult.ReturnCode.ILLEGAL_STATE);
- result.setDescription("Vertex " + this.toString() + " is in state " + state);
- return result;
- }
-
- final AllocatedResource ar = this.allocatedResource.get();
-
- if (ar == null) {
- final TaskKillResult result = new TaskKillResult(getID(), AbstractTaskResult.ReturnCode.NO_INSTANCE);
- result.setDescription("Assigned instance of vertex " + this.toString() + " is null!");
- return result;
- }
-
- try {
- return ar.getInstance().killTask(this.vertexID);
- } catch (IOException e) {
- final TaskKillResult result = new TaskKillResult(getID(), AbstractTaskResult.ReturnCode.IPC_ERROR);
- result.setDescription(StringUtils.stringifyException(e));
- return result;
- }
- }
-
- /**
* Cancels and removes the task represented by this vertex
* from the instance it is currently running on. If the task
* is not currently running, its execution state is simply
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/2db78a8d/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/executiongraph/ExecutionVertexID.java
----------------------------------------------------------------------
diff --git a/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/executiongraph/ExecutionVertexID.java b/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/executiongraph/ExecutionVertexID.java
index 266b6ef..6209fde 100644
--- a/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/executiongraph/ExecutionVertexID.java
+++ b/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/executiongraph/ExecutionVertexID.java
@@ -13,7 +13,7 @@
package eu.stratosphere.nephele.executiongraph;
-import eu.stratosphere.nephele.io.AbstractID;
+import eu.stratosphere.nephele.AbstractID;
import eu.stratosphere.nephele.managementgraph.ManagementVertexID;
/**
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/2db78a8d/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/executiongraph/ManagementGraphFactory.java
----------------------------------------------------------------------
diff --git a/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/executiongraph/ManagementGraphFactory.java b/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/executiongraph/ManagementGraphFactory.java
index cd768e2..72e3651 100644
--- a/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/executiongraph/ManagementGraphFactory.java
+++ b/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/executiongraph/ManagementGraphFactory.java
@@ -18,7 +18,7 @@ import java.util.Iterator;
import java.util.Map;
import eu.stratosphere.nephele.instance.AbstractInstance;
-import eu.stratosphere.nephele.io.channels.ChannelType;
+import eu.stratosphere.runtime.io.channels.ChannelType;
import eu.stratosphere.nephele.managementgraph.ManagementEdge;
import eu.stratosphere.nephele.managementgraph.ManagementEdgeID;
import eu.stratosphere.nephele.managementgraph.ManagementGate;
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/2db78a8d/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/instance/AbstractInstance.java
----------------------------------------------------------------------
diff --git a/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/instance/AbstractInstance.java b/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/instance/AbstractInstance.java
index 898f8e9..50e0e7f 100644
--- a/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/instance/AbstractInstance.java
+++ b/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/instance/AbstractInstance.java
@@ -24,13 +24,12 @@ import eu.stratosphere.nephele.execution.librarycache.LibraryCacheProfileRequest
import eu.stratosphere.nephele.execution.librarycache.LibraryCacheProfileResponse;
import eu.stratosphere.nephele.execution.librarycache.LibraryCacheUpdate;
import eu.stratosphere.nephele.executiongraph.ExecutionVertexID;
-import eu.stratosphere.nephele.io.channels.ChannelID;
+import eu.stratosphere.runtime.io.channels.ChannelID;
import eu.stratosphere.nephele.ipc.RPC;
import eu.stratosphere.nephele.jobgraph.JobID;
import eu.stratosphere.nephele.net.NetUtils;
import eu.stratosphere.nephele.protocols.TaskOperationProtocol;
import eu.stratosphere.nephele.taskmanager.TaskCancelResult;
-import eu.stratosphere.nephele.taskmanager.TaskKillResult;
import eu.stratosphere.nephele.taskmanager.TaskSubmissionResult;
import eu.stratosphere.nephele.topology.NetworkNode;
import eu.stratosphere.nephele.topology.NetworkTopology;
@@ -96,8 +95,8 @@ public abstract class AbstractInstance extends NetworkNode {
if (this.taskManager == null) {
this.taskManager = RPC.getProxy(TaskOperationProtocol.class,
- new InetSocketAddress(getInstanceConnectionInfo().getAddress(),
- getInstanceConnectionInfo().getIPCPort()), NetUtils.getSocketFactory());
+ new InetSocketAddress(getInstanceConnectionInfo().address(),
+ getInstanceConnectionInfo().ipcPort()), NetUtils.getSocketFactory());
}
return this.taskManager;
@@ -206,22 +205,6 @@ public abstract class AbstractInstance extends NetworkNode {
return getTaskManagerProxy().cancelTask(id);
}
- /**
- * Kills the task identified by the given ID at the instance's
- * {@link eu.stratosphere.nephele.taskmanager.TaskManager}.
- *
- * @param id
- * the ID identifying the task to be killed
- * @throws IOException
- * thrown if an error occurs while transmitting the request or receiving the response
- * @return the result of the kill attempt
- */
- public synchronized TaskKillResult killTask(final ExecutionVertexID id) throws IOException {
-
- return getTaskManagerProxy().killTask(id);
- }
-
-
@Override
public boolean equals(final Object obj) {
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/2db78a8d/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/instance/AllocationID.java
----------------------------------------------------------------------
diff --git a/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/instance/AllocationID.java b/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/instance/AllocationID.java
index 7e8339f..3c83b80 100644
--- a/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/instance/AllocationID.java
+++ b/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/instance/AllocationID.java
@@ -13,7 +13,7 @@
package eu.stratosphere.nephele.instance;
-import eu.stratosphere.nephele.io.AbstractID;
+import eu.stratosphere.nephele.AbstractID;
/**
* An allocation ID unambiguously identifies the allocated resources
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/2db78a8d/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/instance/InstanceConnectionInfo.java
----------------------------------------------------------------------
diff --git a/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/instance/InstanceConnectionInfo.java b/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/instance/InstanceConnectionInfo.java
index a0fd608..257421b 100644
--- a/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/instance/InstanceConnectionInfo.java
+++ b/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/instance/InstanceConnectionInfo.java
@@ -154,7 +154,8 @@ public class InstanceConnectionInfo implements IOReadableWritable, Comparable<In
*
* @return the port instance's task manager runs its IPC service on
*/
- public int getIPCPort() {
+ public int ipcPort() {
+
return this.ipcPort;
}
@@ -163,7 +164,8 @@ public class InstanceConnectionInfo implements IOReadableWritable, Comparable<In
*
* @return the port instance's task manager expects to receive transfer envelopes on
*/
- public int getDataPort() {
+ public int dataPort() {
+
return this.dataPort;
}
@@ -172,7 +174,8 @@ public class InstanceConnectionInfo implements IOReadableWritable, Comparable<In
*
* @return the network address the instance's task manager binds its sockets to
*/
- public InetAddress getAddress() {
+ public InetAddress address() {
+
return this.inetAddress;
}
@@ -182,7 +185,8 @@ public class InstanceConnectionInfo implements IOReadableWritable, Comparable<In
*
* @return the host name of the instance
*/
- public String getHostName() {
+ public String hostname() {
+
return this.hostName;
}
@@ -191,7 +195,8 @@ public class InstanceConnectionInfo implements IOReadableWritable, Comparable<In
*
* @return the domain name of the instance or <code>null</code> if the domain name could not be determined
*/
- public String getDomainName() {
+ public String domainName() {
+
return this.domainName;
}
@@ -251,15 +256,15 @@ public class InstanceConnectionInfo implements IOReadableWritable, Comparable<In
if (obj instanceof InstanceConnectionInfo) {
InstanceConnectionInfo ici = (InstanceConnectionInfo) obj;
- if (!this.inetAddress.equals(ici.getAddress())) {
+ if (!this.inetAddress.equals(ici.address())) {
return false;
}
- if (this.ipcPort != ici.getIPCPort()) {
+ if (this.ipcPort != ici.ipcPort()) {
return false;
}
- if (this.dataPort != ici.getDataPort()) {
+ if (this.dataPort != ici.dataPort()) {
return false;
}
@@ -278,7 +283,9 @@ public class InstanceConnectionInfo implements IOReadableWritable, Comparable<In
@Override
public int compareTo(final InstanceConnectionInfo o) {
- return this.getAddress().getHostName().compareTo(((InstanceConnectionInfo) o).getAddress().getHostName());
+
+ return this.address().getHostName()
+ .compareTo(((InstanceConnectionInfo) o).address().getHostName());
}
}
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/2db78a8d/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/instance/InstanceID.java
----------------------------------------------------------------------
diff --git a/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/instance/InstanceID.java b/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/instance/InstanceID.java
index c93b6bd..9b5b707 100644
--- a/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/instance/InstanceID.java
+++ b/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/instance/InstanceID.java
@@ -13,7 +13,7 @@
package eu.stratosphere.nephele.instance;
-import eu.stratosphere.nephele.io.AbstractID;
+import eu.stratosphere.nephele.AbstractID;
/**
* Class for statistically unique instance IDs.
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/2db78a8d/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/instance/cluster/ClusterManager.java
----------------------------------------------------------------------
diff --git a/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/instance/cluster/ClusterManager.java b/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/instance/cluster/ClusterManager.java
index b69d23e..480e521 100644
--- a/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/instance/cluster/ClusterManager.java
+++ b/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/instance/cluster/ClusterManager.java
@@ -396,14 +396,14 @@ public class ClusterManager implements InstanceManager {
final HardwareDescription hardwareDescription) {
// Check if there is a user-defined instance type for this IP address
- InstanceType instanceType = this.ipToInstanceTypeMapping.get(instanceConnectionInfo.getAddress());
+ InstanceType instanceType = this.ipToInstanceTypeMapping.get(instanceConnectionInfo.address());
if (instanceType != null) {
LOG.info("Found user-defined instance type for cluster instance with IP "
- + instanceConnectionInfo.getAddress() + ": " + instanceType);
+ + instanceConnectionInfo.address() + ": " + instanceType);
} else {
instanceType = matchHardwareDescriptionWithInstanceType(hardwareDescription);
if (instanceType != null) {
- LOG.info("Hardware profile of cluster instance with IP " + instanceConnectionInfo.getAddress()
+ LOG.info("Hardware profile of cluster instance with IP " + instanceConnectionInfo.address()
+ " matches with instance type " + instanceType);
} else {
LOG.error("No matching instance type, cannot create cluster instance");
@@ -412,7 +412,7 @@ public class ClusterManager implements InstanceManager {
}
// Try to match new host with a stub host from the existing topology
- String instanceName = instanceConnectionInfo.getHostName();
+ String instanceName = instanceConnectionInfo.hostname();
NetworkNode parentNode = this.networkTopology.getRootNode();
NetworkNode currentStubNode = null;
@@ -439,7 +439,7 @@ public class ClusterManager implements InstanceManager {
// Try to match the new host using the IP address
if (currentStubNode == null) {
- instanceName = instanceConnectionInfo.getAddress().toString();
+ instanceName = instanceConnectionInfo.address().toString();
instanceName = instanceName.replaceAll("/", ""); // Remove any / characters
currentStubNode = this.networkTopology.getNodeByName(instanceName);
}
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/2db78a8d/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/io/AbstractGate.java
----------------------------------------------------------------------
diff --git a/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/io/AbstractGate.java b/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/io/AbstractGate.java
deleted file mode 100644
index c3e3697..0000000
--- a/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/io/AbstractGate.java
+++ /dev/null
@@ -1,148 +0,0 @@
-/***********************************************************************************************************************
- * Copyright (C) 2010-2013 by the Stratosphere project (http://stratosphere.eu)
- *
- * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
- * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations under the License.
- **********************************************************************************************************************/
-
-package eu.stratosphere.nephele.io;
-
-import eu.stratosphere.core.io.IOReadableWritable;
-import eu.stratosphere.nephele.event.task.AbstractTaskEvent;
-import eu.stratosphere.nephele.event.task.EventListener;
-import eu.stratosphere.nephele.event.task.EventNotificationManager;
-import eu.stratosphere.nephele.io.channels.ChannelType;
-import eu.stratosphere.nephele.jobgraph.JobID;
-
-/**
- * In Nephele a gate represents the connection between a user program and the processing framework. A gate
- * must be connected to exactly one record reader/writer and to at least one channel. The <code>Gate</code> class itself
- * is abstract. A gate automatically created for every record reader/writer in the user program. A gate can only be used
- * to transport one specific type of records.
- * <p>
- * This class in general is not thread-safe.
- *
- * @param <T>
- * the record type to be transported from this gate
- */
-public abstract class AbstractGate<T extends IOReadableWritable> implements Gate<T> {
-
- /**
- * The ID of the job this gate belongs to.
- */
- private final JobID jobID;
-
- /**
- * The ID of this gate.
- */
- private final GateID gateID;
-
- /**
- * The index of the gate in the list of available input/output gates.
- */
- private final int index;
-
- /**
- * The event notification manager used to dispatch events.
- */
- private final EventNotificationManager eventNotificationManager = new EventNotificationManager();
-
- /**
- * The type of input/output channels connected to this gate.
- */
- private ChannelType channelType = ChannelType.NETWORK;
-
- /**
- * Constructs a new abstract gate
- *
- * @param jobID
- * the ID of the job this gate belongs to
- * @param gateID
- * the ID of this gate
- * @param index
- * the index of the gate in the list of available input/output gates.
- */
- protected AbstractGate(final JobID jobID, final GateID gateID, final int index) {
- this.jobID = jobID;
- this.gateID = gateID;
- this.index = index;
- }
-
-
- @Override
- public final int getIndex() {
- return this.index;
- }
-
- /**
- * Returns the event notification manager used to dispatch events.
- *
- * @return the event notification manager used to dispatch events
- */
- protected final EventNotificationManager getEventNotificationManager() {
- return this.eventNotificationManager;
- }
-
-
- @Override
- public String toString() {
-
- return "Gate " + this.index;
- }
-
-
- @Override
- public final void subscribeToEvent(EventListener eventListener, Class<? extends AbstractTaskEvent> eventType) {
-
- this.eventNotificationManager.subscribeToEvent(eventListener, eventType);
- }
-
-
- @Override
- public final void unsubscribeFromEvent(final EventListener eventListener,
- final Class<? extends AbstractTaskEvent> eventType) {
-
- this.eventNotificationManager.unsubscribeFromEvent(eventListener, eventType);
- }
-
-
- @Override
- public final void deliverEvent(final AbstractTaskEvent event) {
-
- this.eventNotificationManager.deliverEvent((AbstractTaskEvent) event);
- }
-
-
- @Override
- public final void setChannelType(final ChannelType channelType) {
-
- this.channelType = channelType;
- }
-
-
- @Override
- public final ChannelType getChannelType() {
-
- return this.channelType;
- }
-
-
- @Override
- public JobID getJobID() {
-
- return this.jobID;
- }
-
-
- @Override
- public GateID getGateID() {
-
- return this.gateID;
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/2db78a8d/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/io/AbstractID.java
----------------------------------------------------------------------
diff --git a/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/io/AbstractID.java b/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/io/AbstractID.java
deleted file mode 100644
index 37e6cbb..0000000
--- a/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/io/AbstractID.java
+++ /dev/null
@@ -1,214 +0,0 @@
-/***********************************************************************************************************************
- * Copyright (C) 2010-2013 by the Stratosphere project (http://stratosphere.eu)
- *
- * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
- * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations under the License.
- **********************************************************************************************************************/
-
-package eu.stratosphere.nephele.io;
-
-import java.io.DataInput;
-import java.io.DataOutput;
-import java.io.IOException;
-
-import eu.stratosphere.core.io.IOReadableWritable;
-import eu.stratosphere.util.StringUtils;
-
-/**
- * ID is an abstract base class for providing statistically unique identification numbers in Nephele.
- * Every component that requires these kinds of IDs provides its own concrete type.
- * <p>
- * This class is thread-safe.
- *
- */
-public abstract class AbstractID implements IOReadableWritable {
-
- /**
- * The size of a long in bytes.
- */
- private static final int SIZE_OF_LONG = 8;
-
- /**
- * The size of the ID in byte.
- */
- protected static final int SIZE = 2 * SIZE_OF_LONG;
-
- /**
- * The upper part of the actual ID.
- */
- private long upperPart;
-
- /**
- * The lower part of the actual ID.
- */
- private long lowerPart;
-
- /**
- * Constructs a new ID with a specific bytes value.
- */
- public AbstractID(final byte[] bytes) {
-
- if (bytes.length != SIZE) {
- throw new IllegalArgumentException("Argument bytes must by an array of " + SIZE + " bytes");
- }
-
- this.lowerPart = byteArrayToLong(bytes, 0);
- this.upperPart = byteArrayToLong(bytes, SIZE_OF_LONG);
- }
-
- /**
- * Constructs a new abstract ID.
- *
- * @param lowerPart
- * the lower bytes of the ID
- * @param upperPart
- * the higher bytes of the ID
- */
- protected AbstractID(final long lowerPart, final long upperPart) {
-
- this.lowerPart = lowerPart;
- this.upperPart = upperPart;
- }
-
- /**
- * Creates a new abstract ID from the given one. The given and the newly created abtract ID will be identical, i.e.
- * a comparison by <code>equals</code> will return <code>true</code> and both objects will have the same hash code.
- *
- * @param id
- * the abstract ID to copy
- */
- protected AbstractID(final AbstractID id) {
-
- this.lowerPart = id.lowerPart;
- this.upperPart = id.upperPart;
- }
-
- /**
- * Constructs a new random ID from a uniform distribution.
- */
- public AbstractID() {
-
- this.lowerPart = generateRandomBytes();
- this.upperPart = generateRandomBytes();
- }
-
- /**
- * Generates a uniformly distributed random positive long.
- *
- * @return a uniformly distributed random positive long
- */
- protected static long generateRandomBytes() {
-
- return (long) (Math.random() * Long.MAX_VALUE);
- }
-
- /**
- * Converts the given byte array to a long.
- *
- * @param ba
- * the byte array to be converted
- * @param offset
- * the offset indicating at which byte inside the array the conversion shall begin
- * @return the long variable
- */
- private static long byteArrayToLong(final byte[] ba, final int offset) {
-
- long l = 0;
-
- for (int i = 0; i < SIZE_OF_LONG; ++i) {
- l |= (ba[offset + SIZE_OF_LONG - 1 - i] & 0xffL) << (i << 3);
- }
-
- return l;
- }
-
- /**
- * Converts a long to a byte array.
- *
- * @param l
- * the long variable to be converted
- * @param ba
- * the byte array to store the result the of the conversion
- * @param offset
- * the offset indicating at what position inside the byte array the result of the conversion shall be stored
- */
- private static void longToByteArray(final long l, final byte[] ba, final int offset) {
-
- for (int i = 0; i < SIZE_OF_LONG; ++i) {
- final int shift = i << 3; // i * 8
- ba[offset + SIZE_OF_LONG - 1 - i] = (byte) ((l & (0xffL << shift)) >>> shift);
- }
- }
-
- /**
- * Sets an ID from another ID by copying its internal byte representation.
- *
- * @param src
- * the source ID
- */
- public void setID(final AbstractID src) {
- this.lowerPart = src.lowerPart;
- this.upperPart = src.upperPart;
- }
-
-
- @Override
- public boolean equals(final Object obj) {
-
- if (!(obj instanceof AbstractID)) {
- return false;
- }
-
- final AbstractID src = (AbstractID) obj;
-
- if (src.lowerPart != this.lowerPart) {
- return false;
- }
-
- if (src.upperPart != this.upperPart) {
- return false;
- }
-
- return true;
- }
-
-
- @Override
- public int hashCode() {
-
- return (int) (this.lowerPart ^ (this.upperPart >>> 32));
- }
-
-
- @Override
- public void read(final DataInput in) throws IOException {
-
- this.lowerPart = in.readLong();
- this.upperPart = in.readLong();
- }
-
-
- @Override
- public void write(final DataOutput out) throws IOException {
-
- out.writeLong(this.lowerPart);
- out.writeLong(this.upperPart);
- }
-
-
- @Override
- public String toString() {
-
- final byte[] ba = new byte[SIZE];
- longToByteArray(this.lowerPart, ba, 0);
- longToByteArray(this.upperPart, ba, SIZE_OF_LONG);
-
- return StringUtils.byteToHexString(ba);
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/2db78a8d/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/io/AbstractRecordReader.java
----------------------------------------------------------------------
diff --git a/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/io/AbstractRecordReader.java b/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/io/AbstractRecordReader.java
deleted file mode 100644
index 3a8ed25..0000000
--- a/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/io/AbstractRecordReader.java
+++ /dev/null
@@ -1,98 +0,0 @@
-/***********************************************************************************************************************
- * Copyright (C) 2010-2013 by the Stratosphere project (http://stratosphere.eu)
- *
- * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
- * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations under the License.
- **********************************************************************************************************************/
-
-package eu.stratosphere.nephele.io;
-
-import eu.stratosphere.nephele.event.task.AbstractTaskEvent;
-import eu.stratosphere.nephele.event.task.EventListener;
-import eu.stratosphere.nephele.event.task.EventNotificationManager;
-
-/**
- * This is an abstract base class for a record reader, either dealing with mutable or immutable records,
- * and dealing with reads from single gates (single end points) or multiple gates (union).
- */
-public abstract class AbstractRecordReader implements ReaderBase {
-
-
- private final EventNotificationManager eventHandler = new EventNotificationManager();
-
- private int numEventsUntilEndOfSuperstep = -1;
-
- private int endOfSuperstepEventsCount;
-
- // --------------------------------------------------------------------------------------------
-
- /**
- * Subscribes the listener object to receive events of the given type.
- *
- * @param eventListener
- * the listener object to register
- * @param eventType
- * the type of event to register the listener for
- */
- @Override
- public void subscribeToEvent(EventListener eventListener, Class<? extends AbstractTaskEvent> eventType) {
- this.eventHandler.subscribeToEvent(eventListener, eventType);
- }
-
- /**
- * Removes the subscription for events of the given type for the listener object.
- *
- * @param eventListener The listener object to cancel the subscription for.
- * @param eventType The type of the event to cancel the subscription for.
- */
- @Override
- public void unsubscribeFromEvent(EventListener eventListener, Class<? extends AbstractTaskEvent> eventType) {
- this.eventHandler.unsubscribeFromEvent(eventListener, eventType);
- }
-
-
- protected void handleEvent(AbstractTaskEvent evt) {
- this.eventHandler.deliverEvent(evt);
- }
-
- @Override
- public void setIterative(int numEventsUntilEndOfSuperstep) {
- this.numEventsUntilEndOfSuperstep = numEventsUntilEndOfSuperstep;
- }
-
- @Override
- public void startNextSuperstep() {
- if (this.numEventsUntilEndOfSuperstep == -1) {
- throw new IllegalStateException("Called 'startNextSuperstep()' in a non-iterative reader.");
- }
- else if (endOfSuperstepEventsCount < numEventsUntilEndOfSuperstep) {
- throw new IllegalStateException("Premature 'startNextSuperstep()'. Not yet reached the end-of-superstep.");
- }
- this.endOfSuperstepEventsCount = 0;
- }
-
- @Override
- public boolean hasReachedEndOfSuperstep() {
- return endOfSuperstepEventsCount== numEventsUntilEndOfSuperstep;
- }
-
- protected boolean incrementEndOfSuperstepEventAndCheck() {
- if (numEventsUntilEndOfSuperstep == -1) {
- throw new IllegalStateException("Received EndOfSuperstep event in a non-iterative reader.");
- }
-
- endOfSuperstepEventsCount++;
-
- if (endOfSuperstepEventsCount > numEventsUntilEndOfSuperstep) {
- throw new IllegalStateException("Received EndOfSuperstep events beyond the number to indicate the end of the superstep");
- }
-
- return endOfSuperstepEventsCount== numEventsUntilEndOfSuperstep;
- }
-}