You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by rm...@apache.org on 2014/06/09 20:30:54 UTC
[19/30] Offer buffer-oriented API for I/O (#25)
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/2db78a8d/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/taskmanager/TaskManager.java
----------------------------------------------------------------------
diff --git a/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/taskmanager/TaskManager.java b/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/taskmanager/TaskManager.java
index 1ea5b1b..5240fc8 100644
--- a/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/taskmanager/TaskManager.java
+++ b/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/taskmanager/TaskManager.java
@@ -65,7 +65,6 @@ import eu.stratosphere.nephele.executiongraph.ExecutionVertexID;
import eu.stratosphere.nephele.instance.HardwareDescription;
import eu.stratosphere.nephele.instance.HardwareDescriptionFactory;
import eu.stratosphere.nephele.instance.InstanceConnectionInfo;
-import eu.stratosphere.nephele.io.channels.ChannelID;
import eu.stratosphere.nephele.ipc.RPC;
import eu.stratosphere.nephele.ipc.Server;
import eu.stratosphere.nephele.jobgraph.JobID;
@@ -80,12 +79,11 @@ import eu.stratosphere.nephele.protocols.TaskOperationProtocol;
import eu.stratosphere.nephele.services.iomanager.IOManager;
import eu.stratosphere.nephele.services.memorymanager.MemoryManager;
import eu.stratosphere.nephele.services.memorymanager.spi.DefaultMemoryManager;
-import eu.stratosphere.nephele.taskmanager.bytebuffered.ByteBufferedChannelManager;
-import eu.stratosphere.nephele.taskmanager.bytebuffered.InsufficientResourcesException;
-import eu.stratosphere.nephele.taskmanager.runtime.ExecutorThreadFactory;
-import eu.stratosphere.nephele.taskmanager.runtime.RuntimeTask;
import eu.stratosphere.nephele.util.SerializableArrayList;
import eu.stratosphere.pact.runtime.cache.FileCache;
+import eu.stratosphere.runtime.io.channels.ChannelID;
+import eu.stratosphere.runtime.io.network.ChannelManager;
+import eu.stratosphere.runtime.io.network.InsufficientResourcesException;
import eu.stratosphere.util.StringUtils;
/**
@@ -128,10 +126,10 @@ public class TaskManager implements TaskOperationProtocol {
private final InstanceConnectionInfo localInstanceConnectionInfo;
/**
- * The instance of the {@link ByteBufferedChannelManager} which is responsible for
+ * The instance of the {@link eu.stratosphere.nephele.taskmanager.io.bytebuffered.ChannelManager} which is responsible for
* setting up and cleaning up the byte buffered channels of the tasks.
*/
- private final ByteBufferedChannelManager byteBufferedChannelManager;
+ private final ChannelManager channelManager;
/**
* Instance of the task manager profile if profiling is enabled.
@@ -279,14 +277,24 @@ public class TaskManager implements TaskOperationProtocol {
final int pageSize = GlobalConfiguration.getInteger(ConfigConstants.TASK_MANAGER_NETWORK_BUFFER_SIZE_KEY,
ConfigConstants.DEFAULT_TASK_MANAGER_NETWORK_BUFFER_SIZE);
+ // Initialize network buffer pool
+ int numBuffers = GlobalConfiguration.getInteger(
+ ConfigConstants.TASK_MANAGER_NETWORK_NUM_BUFFERS_KEY,
+ ConfigConstants.DEFAULT_TASK_MANAGER_NETWORK_NUM_BUFFERS);
+
+ int bufferSize = GlobalConfiguration.getInteger(
+ ConfigConstants.TASK_MANAGER_NETWORK_BUFFER_SIZE_KEY,
+ ConfigConstants.DEFAULT_TASK_MANAGER_NETWORK_BUFFER_SIZE);
+
// Initialize the byte buffered channel manager
+ ChannelManager channelManager = null;
try {
- this.byteBufferedChannelManager = new ByteBufferedChannelManager(this.lookupService,
- this.localInstanceConnectionInfo);
- } catch (Exception e) {
- LOG.fatal("Cannot create byte channel manager:" + e.getMessage(), e);
- throw new Exception("Failed to instantiate Byte-buffered channel manager. " + e.getMessage(), e);
+ channelManager = new ChannelManager(this.lookupService, this.localInstanceConnectionInfo, numBuffers, bufferSize);
+ } catch (IOException ioe) {
+ LOG.error(StringUtils.stringifyException(ioe));
+ throw new Exception("Failed to instantiate Byte-buffered channel manager. " + ioe.getMessage(), ioe);
}
+ this.channelManager = channelManager;
{
HardwareDescription resources = HardwareDescriptionFactory.extractFromSystem();
@@ -598,7 +606,6 @@ public class TaskManager implements TaskOperationProtocol {
return new TaskCancelResult(id, AbstractTaskResult.ReturnCode.SUCCESS);
}
-
@Override
public TaskKillResult killTask(final ExecutionVertexID id) throws IOException {
@@ -661,13 +668,11 @@ public class TaskManager implements TaskOperationProtocol {
}
final Configuration jobConfiguration = tdd.getJobConfiguration();
- final Set<ChannelID> activeOutputChannels = null; // TODO: Fix me
// Register the task
Task task;
try {
- task = createAndRegisterTask(vertexID, jobConfiguration, re,
- activeOutputChannels);
+ task = createAndRegisterTask(vertexID, jobConfiguration, re);
} catch (InsufficientResourcesException e) {
final TaskSubmissionResult result = new TaskSubmissionResult(vertexID,
AbstractTaskResult.ReturnCode.INSUFFICIENT_RESOURCES);
@@ -707,12 +712,10 @@ public class TaskManager implements TaskOperationProtocol {
* the job configuration that has been attached to the original job graph
* @param environment
* the environment of the task to be registered
- * @param activeOutputChannels
- * the set of initially active output channels
* @return the task to be started or <code>null</code> if a task with the same ID was already running
*/
private Task createAndRegisterTask(final ExecutionVertexID id, final Configuration jobConfiguration,
- final RuntimeEnvironment environment, final Set<ChannelID> activeOutputChannels)
+ final RuntimeEnvironment environment)
throws InsufficientResourcesException, IOException {
if (id == null) {
@@ -730,10 +733,10 @@ public class TaskManager implements TaskOperationProtocol {
final Task runningTask = this.runningTasks.get(id);
boolean registerTask = true;
if (runningTask == null) {
- task = new RuntimeTask(id, environment, this);
+ task = new Task(id, environment, this);
} else {
- if (runningTask instanceof RuntimeTask) {
+ if (runningTask instanceof Task) {
// Task is already running
return null;
} else {
@@ -746,7 +749,7 @@ public class TaskManager implements TaskOperationProtocol {
if (registerTask) {
// Register the task with the byte buffered channel manager
- this.byteBufferedChannelManager.register(task, activeOutputChannels);
+ this.channelManager.register(task);
boolean enableProfiling = false;
if (this.profiler != null && jobConfiguration.getBoolean(ProfilingUtils.PROFILE_JOB_KEY, true)) {
@@ -786,7 +789,7 @@ public class TaskManager implements TaskOperationProtocol {
this.fileCache.deleteTmpFile(e.getKey(), task.getJobID());
}
// Unregister task from the byte buffered channel manager
- this.byteBufferedChannelManager.unregister(id, task);
+ this.channelManager.unregister(id, task);
// Unregister task from profiling
task.unregisterProfiler(this.profiler);
@@ -892,7 +895,7 @@ public class TaskManager implements TaskOperationProtocol {
}
// Shut down the network channel manager
- this.byteBufferedChannelManager.shutdown();
+ this.channelManager.shutdown();
// Shut down the memory manager
if (this.ioManager != null) {
@@ -930,8 +933,9 @@ public class TaskManager implements TaskOperationProtocol {
}
@Override
- public void logBufferUtilization() {
- this.byteBufferedChannelManager.logBufferUtilization();
+ public void logBufferUtilization() throws IOException {
+
+ this.channelManager.logBufferUtilization();
}
@Override
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/2db78a8d/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/taskmanager/bufferprovider/BufferAvailabilityListener.java
----------------------------------------------------------------------
diff --git a/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/taskmanager/bufferprovider/BufferAvailabilityListener.java b/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/taskmanager/bufferprovider/BufferAvailabilityListener.java
deleted file mode 100644
index 49e774b..0000000
--- a/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/taskmanager/bufferprovider/BufferAvailabilityListener.java
+++ /dev/null
@@ -1,28 +0,0 @@
-/***********************************************************************************************************************
- * Copyright (C) 2010-2013 by the Stratosphere project (http://stratosphere.eu)
- *
- * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
- * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations under the License.
- **********************************************************************************************************************/
-
-package eu.stratosphere.nephele.taskmanager.bufferprovider;
-
-/**
- * This interface must be implemented to receive a notification from a {@link BufferProvider} when an empty
- * {@link Buffer} has
- * become available again.
- *
- */
-public interface BufferAvailabilityListener {
-
- /**
- * Indicates that at least one {@link Buffer} has become available again.
- */
- void bufferAvailable();
-}
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/2db78a8d/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/taskmanager/bufferprovider/BufferProvider.java
----------------------------------------------------------------------
diff --git a/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/taskmanager/bufferprovider/BufferProvider.java b/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/taskmanager/bufferprovider/BufferProvider.java
deleted file mode 100644
index 1b47b57..0000000
--- a/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/taskmanager/bufferprovider/BufferProvider.java
+++ /dev/null
@@ -1,82 +0,0 @@
-/***********************************************************************************************************************
- * Copyright (C) 2010-2013 by the Stratosphere project (http://stratosphere.eu)
- *
- * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
- * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations under the License.
- **********************************************************************************************************************/
-
-package eu.stratosphere.nephele.taskmanager.bufferprovider;
-
-import java.io.IOException;
-
-import eu.stratosphere.nephele.io.channels.Buffer;
-
-public interface BufferProvider {
-
- /**
- * Requests an empty buffer with a minimum size of <code>minimumSizeOfBuffer</code>. The method returns
- * immediately, even if the request could not be fulfilled. Note that <code>minimumSizeOfBuffer</code> must not
- * exceed the value returned by the method <code>getMaximumBufferSize()</code>.
- *
- * @param minimumSizeOfBuffer
- * the minimum size of the requested read buffer in bytes
- * @return the buffer with at least the requested size or <code>null</code> if no such buffer is currently available
- * @throws IOException
- * thrown if an I/O error occurs while allocating the buffer
- */
- Buffer requestEmptyBuffer(int minimumSizeOfBuffer) throws IOException;
-
- /**
- * Requests an empty buffer with a minimum size of <code>minimumSizeOfBuffer</code>. The method blocks
- * until the request can be fulfilled. Note that <code>minimumSizeOfBuffer</code> must not
- * exceed the value returned by the method <code>getMaximumBufferSize()</code>.
- *
- * @param minimumSizeOfBuffer
- * the minimum size of the requested read buffer in bytes
- * @return the buffer with at least the requested size
- * @throws IOException
- * thrown if an I/O error occurs while allocating the buffer
- * @throws InterruptedException
- * thrown if the thread waiting for the buffer is interrupted
- */
- Buffer requestEmptyBufferBlocking(int minimumSizeOfBuffer) throws IOException,
- InterruptedException;
-
- /**
- * Returns the maximum buffer size in bytes available at this buffer provider.
- *
- * @return the maximum buffer size in bytes available at this buffer provider
- */
- int getMaximumBufferSize();
-
- /**
- * Returns if this buffer provider is shared between different entities (for examples tasks).
- *
- * @return <code>true</code> if this buffer provider is shared, <code>false</code> otherwise
- */
- boolean isShared();
-
- /**
- * Reports an asynchronous event. Calling this method interrupts each blocking method of the buffer provider and
- * allows the blocked thread to respond to the event.
- */
- void reportAsynchronousEvent();
-
- /**
- * Registers the given {@link BufferAvailabilityListener} with an empty buffer pool to receive a notification when
- * at least one buffer has become available again. After the notification, the listener is automatically
- * unregistered again.
- *
- * @param bufferAvailabilityListener
- * the listener to be registered
- * @return <code>true</code> if the registration has been successful or <code>false</code> if the registration
- * failed because the buffer pool has not been empty or has already been destroyed
- */
- boolean registerBufferAvailabilityListener(BufferAvailabilityListener bufferAvailabilityListener);
-}
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/2db78a8d/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/taskmanager/bufferprovider/BufferProviderBroker.java
----------------------------------------------------------------------
diff --git a/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/taskmanager/bufferprovider/BufferProviderBroker.java b/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/taskmanager/bufferprovider/BufferProviderBroker.java
deleted file mode 100644
index 6474386..0000000
--- a/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/taskmanager/bufferprovider/BufferProviderBroker.java
+++ /dev/null
@@ -1,24 +0,0 @@
-/***********************************************************************************************************************
- * Copyright (C) 2010-2013 by the Stratosphere project (http://stratosphere.eu)
- *
- * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
- * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations under the License.
- **********************************************************************************************************************/
-
-package eu.stratosphere.nephele.taskmanager.bufferprovider;
-
-import java.io.IOException;
-
-import eu.stratosphere.nephele.io.channels.ChannelID;
-import eu.stratosphere.nephele.jobgraph.JobID;
-
-public interface BufferProviderBroker {
-
- BufferProvider getBufferProvider(JobID jobID, ChannelID sourceChannelID) throws IOException, InterruptedException;
-}
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/2db78a8d/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/taskmanager/bufferprovider/GlobalBufferPool.java
----------------------------------------------------------------------
diff --git a/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/taskmanager/bufferprovider/GlobalBufferPool.java b/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/taskmanager/bufferprovider/GlobalBufferPool.java
deleted file mode 100644
index 0fc25eb..0000000
--- a/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/taskmanager/bufferprovider/GlobalBufferPool.java
+++ /dev/null
@@ -1,135 +0,0 @@
-/***********************************************************************************************************************
- * Copyright (C) 2010-2013 by the Stratosphere project (http://stratosphere.eu)
- *
- * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
- * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations under the License.
- **********************************************************************************************************************/
-
-package eu.stratosphere.nephele.taskmanager.bufferprovider;
-
-import java.util.Queue;
-import java.util.concurrent.ArrayBlockingQueue;
-
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-
-import eu.stratosphere.configuration.ConfigConstants;
-import eu.stratosphere.configuration.GlobalConfiguration;
-import eu.stratosphere.core.memory.MemorySegment;
-
-public final class GlobalBufferPool {
-
- private final static Log LOG = LogFactory.getLog(GlobalBufferPool.class);
-
- /**
- * The singleton instance of the global buffer pool.
- */
- private static GlobalBufferPool instance = null;
-
- /**
- * The number of buffers created at startup.
- */
- private final int numberOfBuffers;
-
- /**
- * The size of read/write buffers in bytes.
- */
- private final int bufferSizeInBytes;
-
- private final Queue<MemorySegment> buffers;
-
- /**
- * Returns the singleton instance of the global buffer pool. If the instance does not already exist, it is also
- * created by calling this method.
- *
- * @return the singleton instance of the global buffer pool
- */
- public static synchronized GlobalBufferPool getInstance() {
-
- if (instance == null) {
- instance = new GlobalBufferPool();
- }
-
- return instance;
- }
-
- /**
- * Constructs the global buffer pool.
- */
- private GlobalBufferPool() {
-
- this.numberOfBuffers = GlobalConfiguration.getInteger(ConfigConstants.TASK_MANAGER_NETWORK_NUM_BUFFERS_KEY,
- ConfigConstants.DEFAULT_TASK_MANAGER_NETWORK_NUM_BUFFERS);
- this.bufferSizeInBytes = GlobalConfiguration.getInteger(ConfigConstants.TASK_MANAGER_NETWORK_BUFFER_SIZE_KEY,
- ConfigConstants.DEFAULT_TASK_MANAGER_NETWORK_BUFFER_SIZE);
-
- this.buffers = new ArrayBlockingQueue<MemorySegment>(this.numberOfBuffers);
-
- // Initialize buffers
- for (int i = 0; i < this.numberOfBuffers; i++) {
- // allocate byteBuffer
- final byte[] segMemory = new byte[this.bufferSizeInBytes];
- final MemorySegment readBuffer = new MemorySegment(segMemory);
- this.buffers.add(readBuffer);
- }
-
- LOG.info("Initialized global buffer pool with " + this.numberOfBuffers + " buffers with a size "
- + this.bufferSizeInBytes + " bytes each");
- }
-
- /**
- * Returns the maximum size of a buffer available at this pool in bytes.
- *
- * @return the maximum size of a buffer available at this pool in bytes
- */
- public int getMaximumBufferSize() {
-
- return this.bufferSizeInBytes;
- }
-
- /**
- * Locks a buffer from the global buffer pool and returns it to the caller of this method.
- *
- * @return the locked buffer from the pool or <code>null</code> if currently no global buffer is available
- */
- public MemorySegment lockGlobalBuffer() {
-
- return this.buffers.poll();
- }
-
- /**
- * Releases a lock on a previously locked buffer and returns the buffer to the global pool.
- *
- * @param releasedBuffer
- * the previously locked buffer to be released
- */
- public void releaseGlobalBuffer(final MemorySegment releasedBuffer) {
- this.buffers.add(releasedBuffer);
- }
-
- /**
- * Returns the total number of buffers managed by this pool.
- *
- * @return the total number of buffers managed by this pool
- */
- public int getTotalNumberOfBuffers() {
-
- return this.numberOfBuffers;
- }
-
- /**
- * Returns the number of buffers which are currently available at this pool.
- *
- * @return the number of buffers which are currently available at this pool
- */
- public int getCurrentNumberOfBuffers() {
-
- return this.buffers.size();
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/2db78a8d/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/taskmanager/bufferprovider/LocalBufferPool.java
----------------------------------------------------------------------
diff --git a/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/taskmanager/bufferprovider/LocalBufferPool.java b/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/taskmanager/bufferprovider/LocalBufferPool.java
deleted file mode 100644
index f296003..0000000
--- a/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/taskmanager/bufferprovider/LocalBufferPool.java
+++ /dev/null
@@ -1,287 +0,0 @@
-/***********************************************************************************************************************
- * Copyright (C) 2010-2013 by the Stratosphere project (http://stratosphere.eu)
- *
- * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
- * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations under the License.
- **********************************************************************************************************************/
-
-package eu.stratosphere.nephele.taskmanager.bufferprovider;
-
-import java.io.IOException;
-import java.util.ArrayDeque;
-import java.util.Queue;
-
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-
-import eu.stratosphere.core.memory.MemorySegment;
-import eu.stratosphere.nephele.io.channels.Buffer;
-import eu.stratosphere.nephele.io.channels.BufferFactory;
-import eu.stratosphere.nephele.io.channels.MemoryBufferPoolConnector;
-
-public final class LocalBufferPool implements BufferProvider {
-
- private static final class LocalBufferPoolConnector implements MemoryBufferPoolConnector {
-
- private final LocalBufferPool localBufferPool;
-
- private LocalBufferPoolConnector(final LocalBufferPool localBufferPool) {
- this.localBufferPool = localBufferPool;
- }
-
- /**
- * {@inheritDoc}
- */
- @Override
- public void recycle(final MemorySegment byteBuffer) {
-
- this.localBufferPool.recycleBuffer(byteBuffer);
- }
-
- }
-
- private final static Log LOG = LogFactory.getLog(LocalBufferPool.class);
-
- private final GlobalBufferPool globalBufferPool;
-
- private final int maximumBufferSize;
-
- private int designatedNumberOfBuffers;
-
- private int requestedNumberOfBuffers = 0;
-
- private final boolean isShared;
-
- private boolean asynchronousEventOccurred = false;
-
- private boolean isDestroyed = false;
-
- private final Queue<MemorySegment> buffers = new ArrayDeque<MemorySegment>();
-
- private final LocalBufferPoolConnector bufferPoolConnector;
-
- private final Queue<BufferAvailabilityListener> bufferAvailabilityListenerQueue = new ArrayDeque<BufferAvailabilityListener>();
-
- public LocalBufferPool(final int designatedNumberOfBuffers, final boolean isShared) {
-
- this.globalBufferPool = GlobalBufferPool.getInstance();
- this.maximumBufferSize = this.globalBufferPool.getMaximumBufferSize();
- this.designatedNumberOfBuffers = designatedNumberOfBuffers;
- this.isShared = isShared;
- this.bufferPoolConnector = new LocalBufferPoolConnector(this);
- }
-
-
- @Override
- public Buffer requestEmptyBuffer(final int minimumSizeOfBuffer) throws IOException {
-
- try {
- return requestBufferInternal(minimumSizeOfBuffer, false);
- } catch (InterruptedException e) {
- LOG.error("Caught unexpected InterruptedException");
- }
-
- return null;
- }
-
-
- @Override
- public Buffer requestEmptyBufferBlocking(final int minimumSizeOfBuffer) throws IOException, InterruptedException {
-
- return requestBufferInternal(minimumSizeOfBuffer, true);
- }
-
- private Buffer requestBufferInternal(final int minimumSizeOfBuffer, final boolean block) throws IOException,
- InterruptedException {
-
- if (minimumSizeOfBuffer > this.maximumBufferSize) {
- throw new IllegalArgumentException("Buffer of " + minimumSizeOfBuffer
- + " bytes is requested, but maximum buffer size is " + this.maximumBufferSize);
- }
-
- while (true) {
-
- boolean async = false;
-
- synchronized (this.buffers) {
-
- // Make sure we return excess buffers immediately
- while (this.requestedNumberOfBuffers > this.designatedNumberOfBuffers) {
-
- final MemorySegment seg = this.buffers.poll();
- if (seg == null) {
- break;
- }
-
- this.globalBufferPool.releaseGlobalBuffer(seg);
- this.requestedNumberOfBuffers--;
- }
-
- while (this.buffers.isEmpty()) {
-
- // Check if the number of cached buffers matches the number of designated buffers
- if (this.requestedNumberOfBuffers < this.designatedNumberOfBuffers) {
-
- final MemorySegment memSeg = this.globalBufferPool.lockGlobalBuffer();
- if (memSeg != null) {
- this.buffers.add(memSeg);
- this.requestedNumberOfBuffers++;
- continue;
- }
- }
-
- if (this.asynchronousEventOccurred && block) {
- this.asynchronousEventOccurred = false;
- async = true;
- break;
- }
-
- if (block) {
- this.buffers.wait(100);
- } else {
- return null;
- }
- }
-
- if (!async) {
- final MemorySegment memSeg = this.buffers.poll();
- return BufferFactory.createFromMemory(minimumSizeOfBuffer, memSeg, this.bufferPoolConnector);
- }
- }
- }
- }
-
-
- @Override
- public int getMaximumBufferSize() {
-
- return this.maximumBufferSize;
- }
-
- /**
- * Sets the designated number of buffers for this local buffer cache.
- *
- * @param designatedNumberOfBuffers
- * the designated number of buffers for this local buffer cache
- */
- public void setDesignatedNumberOfBuffers(final int designatedNumberOfBuffers) {
-
- synchronized (this.buffers) {
-
- this.designatedNumberOfBuffers = designatedNumberOfBuffers;
-
- // Make sure we return excess buffers immediately
- while (this.requestedNumberOfBuffers > this.designatedNumberOfBuffers) {
-
- if (this.buffers.isEmpty()) {
- break;
- }
-
- this.globalBufferPool.releaseGlobalBuffer(this.buffers.poll());
- this.requestedNumberOfBuffers--;
- }
-
- this.buffers.notify();
- }
- }
-
- public void destroy() {
-
- synchronized (this.buffers) {
-
- if (this.isDestroyed) {
- LOG.error("destroy is called on LocalBufferPool multiple times");
- return;
- }
-
- this.isDestroyed = true;
-
- while (!this.buffers.isEmpty()) {
- this.globalBufferPool.releaseGlobalBuffer(this.buffers.poll());
- }
-
- this.requestedNumberOfBuffers = 0;
- }
- }
-
-
- @Override
- public boolean isShared() {
-
- return this.isShared;
- }
-
- public int getNumberOfAvailableBuffers() {
-
- synchronized (this.buffers) {
- return this.buffers.size();
- }
- }
-
- public int getDesignatedNumberOfBuffers() {
-
- synchronized (this.buffers) {
- return this.designatedNumberOfBuffers;
- }
- }
-
- public int getRequestedNumberOfBuffers() {
-
- synchronized (this.buffers) {
- return this.requestedNumberOfBuffers;
- }
- }
-
- private void recycleBuffer(final MemorySegment memSeg) {
-
- synchronized (this.buffers) {
-
- if (this.isDestroyed) {
- this.globalBufferPool.releaseGlobalBuffer(memSeg);
- this.requestedNumberOfBuffers--;
- } else {
- this.buffers.add(memSeg);
- this.buffers.notify();
- }
-
- while (!this.bufferAvailabilityListenerQueue.isEmpty()) {
- this.bufferAvailabilityListenerQueue.poll().bufferAvailable();
- }
- }
- }
-
-
- @Override
- public void reportAsynchronousEvent() {
-
- synchronized (this.buffers) {
- this.asynchronousEventOccurred = true;
- this.buffers.notify();
- }
- }
-
-
- @Override
- public boolean registerBufferAvailabilityListener(final BufferAvailabilityListener bufferAvailabilityListener) {
-
- synchronized (this.buffers) {
- if (!this.buffers.isEmpty()) {
- return false;
- }
-
- if (this.isDestroyed) {
- return false;
- }
-
- this.bufferAvailabilityListenerQueue.add(bufferAvailabilityListener);
- }
-
- return true;
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/2db78a8d/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/taskmanager/bufferprovider/LocalBufferPoolOwner.java
----------------------------------------------------------------------
diff --git a/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/taskmanager/bufferprovider/LocalBufferPoolOwner.java b/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/taskmanager/bufferprovider/LocalBufferPoolOwner.java
deleted file mode 100644
index c56a73c..0000000
--- a/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/taskmanager/bufferprovider/LocalBufferPoolOwner.java
+++ /dev/null
@@ -1,54 +0,0 @@
-/***********************************************************************************************************************
- * Copyright (C) 2010-2013 by the Stratosphere project (http://stratosphere.eu)
- *
- * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
- * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations under the License.
- **********************************************************************************************************************/
-
-package eu.stratosphere.nephele.taskmanager.bufferprovider;
-
-/**
- * A local buffer pool owner is an object which initially retrieves its buffers from the {@link GlobalBufferPool} and
- * manages its fraction of the overall buffer pool locally by means of a {@link LocalBufferPool}.
- *
- */
-public interface LocalBufferPoolOwner {
-
- /**
- * Returns the number of byte-buffered channels that will retrieve their buffers from the local buffer pool.
- *
- * @return the number of byte-buffered channels that will retrieve their buffers from the local buffer pool
- */
- int getNumberOfChannels();
-
- /**
- * Sets the designated number of buffers the local buffer pool owner is allowed to fetch from the global buffer pool
- * and manage locally by means of the {@link LocalBufferPool}.
- *
- * @param numberOfBuffers
- * the numberOfBuffers the local buffer pool owner is allowed to fetch from the global buffer pool
- */
- void setDesignatedNumberOfBuffers(int numberOfBuffers);
-
- /**
- * Clears the local buffer pool and returns all buffers to the global buffer pool.
- */
- void clearLocalBufferPool();
-
- /**
- * Logs the current status of the local buffer pool. This method is intended mainly for debugging purposes.
- */
- void logBufferUtilization();
-
- /**
- * Reports an asynchronous event. Calling this method interrupts each blocking method of the buffer pool owner and
- * allows the blocked thread to respond to the event.
- */
- void reportAsynchronousEvent();
-}
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/2db78a8d/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/taskmanager/bytebuffered/AbstractOutputChannelContext.java
----------------------------------------------------------------------
diff --git a/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/taskmanager/bytebuffered/AbstractOutputChannelContext.java b/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/taskmanager/bytebuffered/AbstractOutputChannelContext.java
deleted file mode 100644
index 066c268..0000000
--- a/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/taskmanager/bytebuffered/AbstractOutputChannelContext.java
+++ /dev/null
@@ -1,84 +0,0 @@
-/***********************************************************************************************************************
- * Copyright (C) 2010-2013 by the Stratosphere project (http://stratosphere.eu)
- *
- * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
- * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations under the License.
- **********************************************************************************************************************/
-
-package eu.stratosphere.nephele.taskmanager.bytebuffered;
-
-import java.util.Iterator;
-
-import eu.stratosphere.nephele.event.task.AbstractEvent;
-import eu.stratosphere.nephele.event.task.AbstractTaskEvent;
-import eu.stratosphere.nephele.taskmanager.transferenvelope.TransferEnvelope;
-
-public abstract class AbstractOutputChannelContext implements OutputChannelContext {
-
- /**
- * The forwarding chain used by this output channel context.
- */
- private final OutputChannelForwardingChain forwardingChain;
-
- public AbstractOutputChannelContext(final OutputChannelForwardingChain forwardingChain) {
-
- this.forwardingChain = forwardingChain;
- }
-
-
- @Override
- public void queueTransferEnvelope(final TransferEnvelope transferEnvelope) {
-
- if (transferEnvelope.getBuffer() != null) {
- throw new IllegalStateException("Transfer envelope for output channel has buffer attached");
- }
-
- final Iterator<AbstractEvent> it = transferEnvelope.getEventList().iterator();
- while (it.hasNext()) {
-
- final AbstractEvent event = it.next();
- if (event instanceof AbstractTaskEvent) {
- processEventAsynchronously(event);
- } else {
- processEventSynchronously(event);
- }
- }
- }
-
- /**
- * Processes an event received from the framework in a synchronous fashion, i.e. the event processing is done by the
- * thread the event is destined for (usually the task thread).
- *
- * @param event
- * the event to be processed
- */
- protected void processEventSynchronously(final AbstractEvent event) {
-
- this.forwardingChain.offerEvent(event);
- }
-
- /**
- * Processes an event received from the framework in an asynchronous fashion, i.e. the event processing is done by
- * the thread which delivers the event.
- *
- * @param event
- * the event to be processed
- */
- protected void processEventAsynchronously(final AbstractEvent event) {
-
- // The default implementation does nothing
- }
-
-
- @Override
- public void destroy() {
-
- this.forwardingChain.destroy();
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/2db78a8d/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/taskmanager/bytebuffered/AbstractOutputChannelForwarder.java
----------------------------------------------------------------------
diff --git a/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/taskmanager/bytebuffered/AbstractOutputChannelForwarder.java b/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/taskmanager/bytebuffered/AbstractOutputChannelForwarder.java
deleted file mode 100644
index 4a59cdc..0000000
--- a/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/taskmanager/bytebuffered/AbstractOutputChannelForwarder.java
+++ /dev/null
@@ -1,109 +0,0 @@
-/***********************************************************************************************************************
- * Copyright (C) 2010-2013 by the Stratosphere project (http://stratosphere.eu)
- *
- * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
- * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations under the License.
- **********************************************************************************************************************/
-
-package eu.stratosphere.nephele.taskmanager.bytebuffered;
-
-import java.io.IOException;
-
-import eu.stratosphere.nephele.event.task.AbstractEvent;
-import eu.stratosphere.nephele.io.channels.Buffer;
-import eu.stratosphere.nephele.taskmanager.transferenvelope.TransferEnvelope;
-
-/**
- * An output channel forwarder is a component which processes a {@link TransferEnvelope} after it has been produced by
- * an {@link AbstractByteBufferedOutputChannel}. The component can decide based on the transfer envelope whether to
- * forward the envelope, discard it, or to store it.
- *
- */
-public abstract class AbstractOutputChannelForwarder {
-
- private final AbstractOutputChannelForwarder next;
-
- private volatile AbstractOutputChannelForwarder prev = null;
-
- protected AbstractOutputChannelForwarder(final AbstractOutputChannelForwarder next) {
- this.next = next;
- if (this.next != null) {
- this.next.prev = this;
- }
- }
-
- /**
- * Called by the framework to push a produced transfer envelope towards its receiver. This method will always be
- * called by the task thread itself.
- *
- * @param transferEnvelope
- * the transfer envelope to be processed
- * @throws IOException
- * thrown if an I/O error occurs while processing the transfer envelope
- * @throws InterruptedException
- * thrown if the task thread was interrupted while processing the transfer envelope
- */
- public void push(final TransferEnvelope transferEnvelope) throws IOException, InterruptedException {
-
- if (this.next != null) {
- this.next.push(transferEnvelope);
- }
- }
-
- public TransferEnvelope pull() {
-
- if (this.prev != null) {
- return this.prev.pull();
- }
-
- return null;
- }
-
- public boolean hasDataLeft() throws IOException, InterruptedException {
-
- if (this.next != null) {
- this.next.hasDataLeft();
- }
-
- return false;
- }
-
- public void processEvent(final AbstractEvent event) {
-
- if (this.next != null) {
- this.next.processEvent(event);
- }
- }
-
- public void destroy() {
-
- if (this.next != null) {
- this.next.destroy();
- }
- }
-
- protected final void recycleTransferEnvelope(final TransferEnvelope transferEnvelope) {
-
- final Buffer buffer = transferEnvelope.getBuffer();
- if (buffer != null) {
- buffer.recycleBuffer();
-
- }
- }
-
- protected final AbstractOutputChannelForwarder getNext() {
-
- return this.next;
- }
-
- protected final AbstractOutputChannelForwarder getPrev() {
-
- return this.prev;
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/2db78a8d/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/taskmanager/bytebuffered/ByteBufferedChannelManager.java
----------------------------------------------------------------------
diff --git a/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/taskmanager/bytebuffered/ByteBufferedChannelManager.java b/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/taskmanager/bytebuffered/ByteBufferedChannelManager.java
deleted file mode 100644
index 3bf351d..0000000
--- a/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/taskmanager/bytebuffered/ByteBufferedChannelManager.java
+++ /dev/null
@@ -1,816 +0,0 @@
-/***********************************************************************************************************************
- * Copyright (C) 2010-2013 by the Stratosphere project (http://stratosphere.eu)
- *
- * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
- * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations under the License.
- **********************************************************************************************************************/
-
-package eu.stratosphere.nephele.taskmanager.bytebuffered;
-
-import java.io.IOException;
-import java.net.InetSocketAddress;
-import java.util.Iterator;
-import java.util.List;
-import java.util.Map;
-import java.util.Set;
-import java.util.concurrent.ConcurrentHashMap;
-
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-
-import eu.stratosphere.configuration.GlobalConfiguration;
-import eu.stratosphere.nephele.execution.Environment;
-import eu.stratosphere.nephele.executiongraph.ExecutionVertexID;
-import eu.stratosphere.nephele.instance.InstanceConnectionInfo;
-import eu.stratosphere.nephele.io.AbstractID;
-import eu.stratosphere.nephele.io.GateID;
-import eu.stratosphere.nephele.io.channels.Buffer;
-import eu.stratosphere.nephele.io.channels.ChannelID;
-import eu.stratosphere.nephele.io.channels.ChannelType;
-import eu.stratosphere.nephele.jobgraph.JobID;
-import eu.stratosphere.nephele.protocols.ChannelLookupProtocol;
-import eu.stratosphere.nephele.taskmanager.Task;
-import eu.stratosphere.nephele.taskmanager.bufferprovider.BufferProvider;
-import eu.stratosphere.nephele.taskmanager.bufferprovider.BufferProviderBroker;
-import eu.stratosphere.nephele.taskmanager.bufferprovider.GlobalBufferPool;
-import eu.stratosphere.nephele.taskmanager.bufferprovider.LocalBufferPool;
-import eu.stratosphere.nephele.taskmanager.bufferprovider.LocalBufferPoolOwner;
-import eu.stratosphere.nephele.taskmanager.transferenvelope.TransferEnvelope;
-import eu.stratosphere.nephele.taskmanager.transferenvelope.TransferEnvelopeDispatcher;
-import eu.stratosphere.nephele.taskmanager.transferenvelope.TransferEnvelopeReceiverList;
-
-public final class ByteBufferedChannelManager implements TransferEnvelopeDispatcher, BufferProviderBroker {
-
- /**
- * The log object used to report problems and errors.
- */
- private static final Log LOG = LogFactory.getLog(ByteBufferedChannelManager.class);
-
- private static final boolean DEFAULT_ALLOW_SENDER_SIDE_SPILLING = false;
-
- private static final boolean DEFAULT_MERGE_SPILLED_BUFFERS = true;
-
- // TODO: Make this configurable
- private static final int NUMBER_OF_CHANNELS_FOR_MULTICAST = 10;
-
- private final Map<ChannelID, ChannelContext> registeredChannels = new ConcurrentHashMap<ChannelID, ChannelContext>();
-
- private final Map<AbstractID, LocalBufferPoolOwner> localBufferPoolOwner = new ConcurrentHashMap<AbstractID, LocalBufferPoolOwner>();
-
- private final NetworkConnectionManager networkConnectionManager;
-
- private final ChannelLookupProtocol channelLookupService;
-
- private final InstanceConnectionInfo localConnectionInfo;
-
- private final LocalBufferPool transitBufferPool;
-
- private final boolean allowSenderSideSpilling;
-
- private final boolean mergeSpilledBuffers;
-
- private final boolean multicastEnabled = true;
-
- /**
- * This map caches transfer envelope receiver lists.
- */
- private final Map<ChannelID, TransferEnvelopeReceiverList> receiverCache = new ConcurrentHashMap<ChannelID, TransferEnvelopeReceiverList>();
-
- public ByteBufferedChannelManager(final ChannelLookupProtocol channelLookupService,
- final InstanceConnectionInfo localInstanceConnectionInfo) throws IOException {
-
- this.channelLookupService = channelLookupService;
-
- this.localConnectionInfo = localInstanceConnectionInfo;
-
- // Initialize the global buffer pool
- GlobalBufferPool.getInstance();
-
- // Initialize the transit buffer pool
- this.transitBufferPool = new LocalBufferPool(128, true);
-
- this.networkConnectionManager = new NetworkConnectionManager(this,
- localInstanceConnectionInfo.getAddress(), localInstanceConnectionInfo.getDataPort());
-
- this.allowSenderSideSpilling = GlobalConfiguration.getBoolean("channel.network.allowSenderSideSpilling",
- DEFAULT_ALLOW_SENDER_SIDE_SPILLING);
-
- this.mergeSpilledBuffers = GlobalConfiguration.getBoolean("channel.network.mergeSpilledBuffers",
- DEFAULT_MERGE_SPILLED_BUFFERS);
-
- LOG.info("Initialized byte buffered channel manager with sender-side spilling "
- + (this.allowSenderSideSpilling ? "enabled" : "disabled")
- + (this.mergeSpilledBuffers ? " and spilled buffer merging enabled" : ""));
- }
-
- /**
- * Registers the given task with the byte buffered channel manager.
- *
- * @param task
- * the task to be registered
- * @param the
- * set of output channels which are initially active
- * @throws InsufficientResourcesException
- * thrown if the channel manager does not have enough memory buffers to safely run this task
- */
- public void register(final Task task, final Set<ChannelID> activeOutputChannels)
- throws InsufficientResourcesException {
-
- // Check if we can safely run this task with the given resources
- checkBufferAvailability(task);
-
- final Environment environment = task.getEnvironment();
-
- final TaskContext taskContext = task.createTaskContext(this,
- this.localBufferPoolOwner.remove(task.getVertexID()));
-
- final Set<GateID> outputGateIDs = environment.getOutputGateIDs();
- for (final Iterator<GateID> gateIt = outputGateIDs.iterator(); gateIt.hasNext();) {
-
- final GateID gateID = gateIt.next();
- final OutputGateContext outputGateContext = taskContext.createOutputGateContext(gateID);
- final Set<ChannelID> outputChannelIDs = environment.getOutputChannelIDsOfGate(gateID);
- for (final Iterator<ChannelID> channelIt = outputChannelIDs.iterator(); channelIt.hasNext();) {
-
- final ChannelID channelID = channelIt.next();
- final OutputChannelContext previousContext = (OutputChannelContext) this.registeredChannels
- .get(channelID);
-
- final boolean isActive = true;/* activeOutputChannels.contains(channelID); */
-
- final OutputChannelContext outputChannelContext = outputGateContext.createOutputChannelContext(
- channelID, previousContext, isActive, this.mergeSpilledBuffers);
-
- // Add routing entry to receiver cache to reduce latency
- if (outputChannelContext.getType() == ChannelType.INMEMORY) {
- addReceiverListHint(outputChannelContext.getChannelID(),
- outputChannelContext.getConnectedChannelID());
- }
-
- // Add routing entry to receiver cache to save lookup for data arriving at the output channel
- if (outputChannelContext.getType() == ChannelType.NETWORK) {
- addReceiverListHint(outputChannelContext.getConnectedChannelID(),
- outputChannelContext.getChannelID());
- }
-
- if (LOG.isDebugEnabled()) {
- LOG.debug("Registering byte buffered output channel " + outputChannelContext.getChannelID() + " ("
- + (isActive ? "active" : "inactive") + ")");
- }
-
- this.registeredChannels.put(outputChannelContext.getChannelID(), outputChannelContext);
- }
- }
-
- final Set<GateID> inputGateIDs = environment.getInputGateIDs();
- for (final Iterator<GateID> gateIt = inputGateIDs.iterator(); gateIt.hasNext();) {
-
- final GateID gateID = gateIt.next();
- final InputGateContext inputGateContext = taskContext.createInputGateContext(gateID);
- final Set<ChannelID> inputChannelIDs = environment.getInputChannelIDsOfGate(gateID);
- for (final Iterator<ChannelID> channelIt = inputChannelIDs.iterator(); channelIt.hasNext();) {
-
- final ChannelID channelID = channelIt.next();
- final InputChannelContext previousContext = (InputChannelContext) this.registeredChannels
- .get(channelID);
-
- final InputChannelContext inputChannelContext = inputGateContext.createInputChannelContext(
- channelID, previousContext);
-
- // Add routing entry to receiver cache to reduce latency
- if (inputChannelContext.getType() == ChannelType.INMEMORY) {
- addReceiverListHint(inputChannelContext.getChannelID(), inputChannelContext.getConnectedChannelID());
- }
-
- this.registeredChannels.put(inputChannelContext.getChannelID(), inputChannelContext);
- }
-
- // Add input gate context to set of local buffer pool owner
- final LocalBufferPoolOwner bufferPoolOwner = inputGateContext.getLocalBufferPoolOwner();
- if (bufferPoolOwner != null) {
- this.localBufferPoolOwner.put(inputGateContext.getGateID(), bufferPoolOwner);
- }
-
- }
-
- this.localBufferPoolOwner.put(task.getVertexID(), taskContext);
-
- redistributeGlobalBuffers();
- }
-
- /**
- * Unregisters the given task from the byte buffered channel manager.
- *
- * @param vertexID
- * the ID of the task to be unregistered
- * @param task
- * the task to be unregistered
- */
- public void unregister(final ExecutionVertexID vertexID, final Task task) {
-
- final Environment environment = task.getEnvironment();
-
- Iterator<ChannelID> channelIterator = environment.getOutputChannelIDs().iterator();
-
- while (channelIterator.hasNext()) {
-
- final ChannelID outputChannelID = channelIterator.next();
- final ChannelContext context = this.registeredChannels.remove(outputChannelID);
- if (context != null) {
- context.destroy();
- }
- this.receiverCache.remove(outputChannelID);
- }
-
- channelIterator = environment.getInputChannelIDs().iterator();
-
- while (channelIterator.hasNext()) {
-
- final ChannelID outputChannelID = channelIterator.next();
- final ChannelContext context = this.registeredChannels.remove(outputChannelID);
- if (context != null) {
- context.destroy();
- }
- this.receiverCache.remove(outputChannelID);
- }
-
- final Iterator<GateID> inputGateIterator = environment.getInputGateIDs().iterator();
-
- while (inputGateIterator.hasNext()) {
-
- final GateID inputGateID = inputGateIterator.next();
-
- final LocalBufferPoolOwner owner = this.localBufferPoolOwner.remove(inputGateID);
- if (owner != null) {
- owner.clearLocalBufferPool();
- }
- }
-
- final LocalBufferPoolOwner owner = this.localBufferPoolOwner.remove(vertexID);
- if (owner != null) {
- owner.clearLocalBufferPool();
- }
-
- redistributeGlobalBuffers();
- }
-
- /**
- * Shuts down the byte buffered channel manager and stops all its internal processes.
- */
- public void shutdown() {
-
- this.networkConnectionManager.shutDown();
- }
-
- public NetworkConnectionManager getNetworkConnectionManager() {
-
- return this.networkConnectionManager;
- }
-
- private void recycleBuffer(final TransferEnvelope envelope) {
-
- final Buffer buffer = envelope.getBuffer();
- if (buffer != null) {
- buffer.recycleBuffer();
- }
- }
-
- private void sendReceiverNotFoundEvent(final TransferEnvelope envelope, final ChannelID receiver)
- throws IOException, InterruptedException {
-
- if (ReceiverNotFoundEvent.isReceiverNotFoundEvent(envelope)) {
-
- LOG.info("Dropping request to send ReceiverNotFoundEvent as response to ReceiverNotFoundEvent");
- return;
- }
-
- final JobID jobID = envelope.getJobID();
-
- final TransferEnvelope transferEnvelope = ReceiverNotFoundEvent.createEnvelopeWithEvent(jobID, receiver,
- envelope.getSequenceNumber());
-
- final TransferEnvelopeReceiverList receiverList = getReceiverList(jobID, receiver);
- if (receiverList == null) {
- return;
- }
-
- processEnvelopeEnvelopeWithoutBuffer(transferEnvelope, receiverList);
- }
-
- private void processEnvelope(final TransferEnvelope transferEnvelope, final boolean freeSourceBuffer)
- throws IOException, InterruptedException {
-
- TransferEnvelopeReceiverList receiverList = null;
- try {
- receiverList = getReceiverList(transferEnvelope.getJobID(),
- transferEnvelope.getSource());
- } catch (InterruptedException e) {
- recycleBuffer(transferEnvelope);
- throw e;
- } catch (IOException e) {
- recycleBuffer(transferEnvelope);
- throw e;
- }
-
- if (receiverList == null) {
- recycleBuffer(transferEnvelope);
- return;
- }
-
- // This envelope is known to have either no buffer or an memory-based input buffer
- if (transferEnvelope.getBuffer() == null) {
- processEnvelopeEnvelopeWithoutBuffer(transferEnvelope, receiverList);
- } else {
- processEnvelopeWithBuffer(transferEnvelope, receiverList, freeSourceBuffer);
- }
- }
-
- private void processEnvelopeWithBuffer(final TransferEnvelope transferEnvelope,
- final TransferEnvelopeReceiverList receiverList, final boolean freeSourceBuffer)
- throws IOException, InterruptedException {
-
- // Handle the most common (unicast) case first
- if (!freeSourceBuffer) {
-
- final List<ChannelID> localReceivers = receiverList.getLocalReceivers();
- if (localReceivers.size() != 1) {
- LOG.error("Expected receiver list to have exactly one element");
- }
-
- final ChannelID localReceiver = localReceivers.get(0);
-
- final ChannelContext cc = this.registeredChannels.get(localReceiver);
- if (cc == null) {
-
- try {
- sendReceiverNotFoundEvent(transferEnvelope, localReceiver);
- } finally {
- recycleBuffer(transferEnvelope);
- }
- return;
- }
-
- if (!cc.isInputChannel()) {
- LOG.error("Local receiver " + localReceiver
- + " is not an input channel, but is supposed to accept a buffer");
- }
-
- cc.queueTransferEnvelope(transferEnvelope);
-
- return;
- }
-
- // This is the in-memory or multicast case
- final Buffer srcBuffer = transferEnvelope.getBuffer();
-
- try {
-
- if (receiverList.hasLocalReceivers()) {
-
- final List<ChannelID> localReceivers = receiverList.getLocalReceivers();
-
- for (final ChannelID localReceiver : localReceivers) {
-
- final ChannelContext cc = this.registeredChannels.get(localReceiver);
- if (cc == null) {
-
- sendReceiverNotFoundEvent(transferEnvelope, localReceiver);
- continue;
- }
-
- if (!cc.isInputChannel()) {
- LOG.error("Local receiver " + localReceiver
- + " is not an input channel, but is supposed to accept a buffer");
- continue;
- }
-
- final InputChannelContext inputChannelContext = (InputChannelContext) cc;
-
- Buffer destBuffer = null;
- try {
- destBuffer = inputChannelContext.requestEmptyBufferBlocking(srcBuffer.size());
- srcBuffer.copyToBuffer(destBuffer);
- } catch (IOException e) {
- if (destBuffer != null) {
- destBuffer.recycleBuffer();
- }
- throw e;
- }
- // TODO: See if we can save one duplicate step here
- final TransferEnvelope dup = transferEnvelope.duplicateWithoutBuffer();
- dup.setBuffer(destBuffer);
- inputChannelContext.queueTransferEnvelope(dup);
- }
- }
-
- if (receiverList.hasRemoteReceivers()) {
-
- final List<RemoteReceiver> remoteReceivers = receiverList.getRemoteReceivers();
-
- // Generate sender hint before sending the first envelope over the network
- if (transferEnvelope.getSequenceNumber() == 0) {
- generateSenderHint(transferEnvelope, remoteReceivers);
- }
-
- for (final RemoteReceiver remoteReceiver : remoteReceivers) {
- TransferEnvelope dup = transferEnvelope.duplicate();
- this.networkConnectionManager.queueEnvelopeForTransfer(remoteReceiver, dup);
- }
- }
- } finally {
- // Recycle the source buffer
- srcBuffer.recycleBuffer();
- }
- }
-
- private void processEnvelopeEnvelopeWithoutBuffer(final TransferEnvelope transferEnvelope,
- final TransferEnvelopeReceiverList receiverList) throws IOException, InterruptedException {
-
- // No need to copy anything
- final Iterator<ChannelID> localIt = receiverList.getLocalReceivers().iterator();
-
- while (localIt.hasNext()) {
-
- final ChannelID localReceiver = localIt.next();
-
- final ChannelContext channelContext = this.registeredChannels.get(localReceiver);
- if (channelContext == null) {
- sendReceiverNotFoundEvent(transferEnvelope, localReceiver);
- continue;
- }
- channelContext.queueTransferEnvelope(transferEnvelope);
- }
-
- if (!receiverList.hasRemoteReceivers()) {
- return;
- }
-
- // Generate sender hint before sending the first envelope over the network
- final List<RemoteReceiver> remoteReceivers = receiverList.getRemoteReceivers();
- if (transferEnvelope.getSequenceNumber() == 0) {
- generateSenderHint(transferEnvelope, remoteReceivers);
- }
-
- final Iterator<RemoteReceiver> remoteIt = remoteReceivers.iterator();
-
- while (remoteIt.hasNext()) {
-
- final RemoteReceiver remoteReceiver = remoteIt.next();
- this.networkConnectionManager.queueEnvelopeForTransfer(remoteReceiver, transferEnvelope);
- }
- }
-
- private void addReceiverListHint(final ChannelID source, final ChannelID localReceiver) {
-
- final TransferEnvelopeReceiverList receiverList = new TransferEnvelopeReceiverList(localReceiver);
-
- if (this.receiverCache.put(source, receiverList) != null) {
- LOG.warn("Receiver cache already contained entry for " + source);
- }
- }
-
- private void addReceiverListHint(final ChannelID source, final RemoteReceiver remoteReceiver) {
-
- final TransferEnvelopeReceiverList receiverList = new TransferEnvelopeReceiverList(remoteReceiver);
-
- if (this.receiverCache.put(source, receiverList) != null) {
- LOG.warn("Receiver cache already contained entry for " + source);
- }
- }
-
- private void generateSenderHint(final TransferEnvelope transferEnvelope, final List<RemoteReceiver> remoteReceivers) {
-
- final ChannelContext channelContext = this.registeredChannels.get(transferEnvelope.getSource());
- if (channelContext == null) {
- LOG.error("Cannot find channel context for channel ID " + transferEnvelope.getSource());
- return;
- }
-
- // Only generate sender hints for output channels
- if (channelContext.isInputChannel()) {
- return;
- }
-
- final ChannelID remoteSourceID = channelContext.getConnectedChannelID();
- final int connectionIndex = remoteReceivers.get(0).getConnectionIndex();
- final InetSocketAddress isa = new InetSocketAddress(this.localConnectionInfo.getAddress(),
- this.localConnectionInfo.getDataPort());
-
- final RemoteReceiver remoteReceiver = new RemoteReceiver(isa, connectionIndex);
- final TransferEnvelope senderHint = SenderHintEvent.createEnvelopeWithEvent(transferEnvelope, remoteSourceID,
- remoteReceiver);
-
- final Iterator<RemoteReceiver> remoteIt = remoteReceivers.iterator();
-
- while (remoteIt.hasNext()) {
-
- final RemoteReceiver rr = remoteIt.next();
- this.networkConnectionManager.queueEnvelopeForTransfer(rr, senderHint);
- }
- }
-
- /**
- * Returns the list of receivers for transfer envelopes produced by the channel with the given source channel ID.
- *
- * @param jobID
- * the ID of the job the given channel ID belongs to
- * @param sourceChannelID
- * the source channel ID for which the receiver list shall be retrieved
- * @return the list of receivers or <code>null</code> if the receiver could not be determined
- * @throws IOException
- * @throws InterruptedExcption
- */
- private TransferEnvelopeReceiverList getReceiverList(final JobID jobID, final ChannelID sourceChannelID)
- throws IOException, InterruptedException {
-
- TransferEnvelopeReceiverList receiverList = this.receiverCache.get(sourceChannelID);
-
- if (receiverList != null) {
- return receiverList;
- }
-
- while (true) {
-
- if (Thread.currentThread().isInterrupted()) {
- break;
- }
-
- ConnectionInfoLookupResponse lookupResponse;
- synchronized (this.channelLookupService) {
- lookupResponse = this.channelLookupService.lookupConnectionInfo(
- this.localConnectionInfo, jobID, sourceChannelID);
- }
-
- if (lookupResponse.isJobAborting()) {
- break;
- }
-
- if (lookupResponse.receiverNotFound()) {
- LOG.error("Cannot find task(s) waiting for data from source channel with ID " + sourceChannelID);
- break;
- }
-
- if (lookupResponse.receiverNotReady()) {
- Thread.sleep(500);
- continue;
- }
-
- if (lookupResponse.receiverReady()) {
- receiverList = new TransferEnvelopeReceiverList(lookupResponse);
- break;
- }
-
- }
-
- if (receiverList != null) {
-
- this.receiverCache.put(sourceChannelID, receiverList);
-
- if (LOG.isDebugEnabled()) {
-
- final StringBuilder sb = new StringBuilder();
- sb.append("Receiver list for source channel ID " + sourceChannelID + " at task manager "
- + this.localConnectionInfo + "\n");
-
- if (receiverList.hasLocalReceivers()) {
- sb.append("\tLocal receivers:\n");
- final Iterator<ChannelID> it = receiverList.getLocalReceivers().iterator();
- while (it.hasNext()) {
- sb.append("\t\t" + it.next() + "\n");
- }
- }
-
- if (receiverList.hasRemoteReceivers()) {
- sb.append("Remote receivers:\n");
- final Iterator<RemoteReceiver> it = receiverList.getRemoteReceivers().iterator();
- while (it.hasNext()) {
- sb.append("\t\t" + it.next() + "\n");
- }
- }
-
- LOG.debug(sb.toString());
- }
- }
-
- return receiverList;
- }
-
-
- @Override
- public void processEnvelopeFromOutputChannel(final TransferEnvelope transferEnvelope) throws IOException,
- InterruptedException {
-
- processEnvelope(transferEnvelope, true);
- }
-
-
- @Override
- public void processEnvelopeFromInputChannel(final TransferEnvelope transferEnvelope) throws IOException,
- InterruptedException {
-
- processEnvelope(transferEnvelope, false);
- }
-
-
- @Override
- public void processEnvelopeFromNetwork(final TransferEnvelope transferEnvelope, boolean freeSourceBuffer)
- throws IOException, InterruptedException {
-
- // Check if the envelope is the special envelope with the sender hint event
- if (SenderHintEvent.isSenderHintEvent(transferEnvelope)) {
-
- // Check if this is the final destination of the sender hint event before adding it
- final SenderHintEvent seh = (SenderHintEvent) transferEnvelope.getEventList().get(0);
- if (this.registeredChannels.get(seh.getSource()) != null) {
-
- addReceiverListHint(seh.getSource(), seh.getRemoteReceiver());
- return;
- }
- }
-
- processEnvelope(transferEnvelope, freeSourceBuffer);
- }
-
- /**
- * Triggers the byte buffer channel manager write the current utilization of its read and write buffers to the logs.
- * This method is primarily for debugging purposes.
- */
- public void logBufferUtilization() {
-
- System.out.println("Buffer utilization at " + System.currentTimeMillis());
-
- System.out.println("\tUnused global buffers: " + GlobalBufferPool.getInstance().getCurrentNumberOfBuffers());
-
- System.out.println("\tLocal buffer pool status:");
-
- final Iterator<LocalBufferPoolOwner> it = this.localBufferPoolOwner.values().iterator();
- while (it.hasNext()) {
- it.next().logBufferUtilization();
- }
-
- this.networkConnectionManager.logBufferUtilization();
-
- System.out.println("\tIncoming connections:");
-
- final Iterator<Map.Entry<ChannelID, ChannelContext>> it2 = this.registeredChannels.entrySet()
- .iterator();
-
- while (it2.hasNext()) {
-
- final Map.Entry<ChannelID, ChannelContext> entry = it2.next();
- final ChannelContext context = entry.getValue();
- if (context.isInputChannel()) {
-
- final InputChannelContext inputChannelContext = (InputChannelContext) context;
- inputChannelContext.logQueuedEnvelopes();
- }
- }
- }
-
-
- @Override
- public BufferProvider getBufferProvider(final JobID jobID, final ChannelID sourceChannelID) throws IOException,
- InterruptedException {
-
- final TransferEnvelopeReceiverList receiverList = getReceiverList(jobID, sourceChannelID);
-
- // Receiver could not be determined, use transit buffer pool to read data from channel
- if (receiverList == null) {
- return this.transitBufferPool;
- }
-
- if (receiverList.hasLocalReceivers() && !receiverList.hasRemoteReceivers()) {
-
- final List<ChannelID> localReceivers = receiverList.getLocalReceivers();
- if (localReceivers.size() == 1) {
- // Unicast case, get final buffer provider
-
- final ChannelID localReceiver = localReceivers.get(0);
- final ChannelContext cc = this.registeredChannels.get(localReceiver);
- if (cc == null) {
-
- // Use the transit buffer for this purpose, data will be discarded in most cases anyway.
- return this.transitBufferPool;
- }
-
- if (!cc.isInputChannel()) {
- throw new IOException("Channel context for local receiver " + localReceiver
- + " is not an input channel context");
- }
-
- final InputChannelContext icc = (InputChannelContext) cc;
-
- return icc;
- }
- }
-
- return this.transitBufferPool;
- }
-
- /**
- * Checks if the byte buffered channel manager has enough resources available to safely execute the given task.
- *
- * @param task
- * the task to be executed
- * @throws InsufficientResourcesException
- * thrown if the byte buffered manager currently does not have enough resources available to execute the
- * task
- */
- private void checkBufferAvailability(final Task task) throws InsufficientResourcesException {
-
- final int totalNumberOfBuffers = GlobalBufferPool.getInstance().getTotalNumberOfBuffers();
- int numberOfAlreadyRegisteredChannels = this.registeredChannels.size();
- if (this.multicastEnabled) {
- numberOfAlreadyRegisteredChannels += NUMBER_OF_CHANNELS_FOR_MULTICAST;
- }
-
- final Environment env = task.getEnvironment();
-
- final int numberOfNewChannels = env.getNumberOfOutputChannels() + env.getNumberOfInputChannels();
- final int totalNumberOfChannels = numberOfAlreadyRegisteredChannels + numberOfNewChannels;
-
- final double buffersPerChannel = (double) totalNumberOfBuffers
- / (double) totalNumberOfChannels;
-
- if (buffersPerChannel < 1.0) {
-
- // Construct error message
- final StringBuilder sb = new StringBuilder(this.localConnectionInfo.getHostName());
- sb.append(" has not enough buffers available to safely execute ");
- sb.append(env.getTaskName());
- sb.append(" (");
- sb.append(totalNumberOfChannels - totalNumberOfBuffers);
- sb.append(" buffers are currently missing)");
-
- throw new InsufficientResourcesException(sb.toString());
- }
- }
-
- /**
- * Redistributes the global buffers among the registered tasks.
- */
- private void redistributeGlobalBuffers() {
-
- final int totalNumberOfBuffers = GlobalBufferPool.getInstance().getTotalNumberOfBuffers();
- int totalNumberOfChannels = this.registeredChannels.size();
- if (this.multicastEnabled) {
- totalNumberOfChannels += NUMBER_OF_CHANNELS_FOR_MULTICAST;
- }
- final double buffersPerChannel = (double) totalNumberOfBuffers / (double) totalNumberOfChannels;
- if (buffersPerChannel < 1.0) {
- LOG.warn("System is low on memory buffers. This may result in reduced performance.");
- }
-
- if (LOG.isDebugEnabled()) {
- LOG.debug("Total number of buffers is " + totalNumberOfBuffers);
- LOG.debug("Total number of channels is " + totalNumberOfChannels);
- }
-
- if (this.localBufferPoolOwner.isEmpty()) {
- return;
- }
-
- final Iterator<LocalBufferPoolOwner> it = this.localBufferPoolOwner.values().iterator();
- while (it.hasNext()) {
- final LocalBufferPoolOwner lbpo = it.next();
- lbpo.setDesignatedNumberOfBuffers((int) Math.ceil(buffersPerChannel * lbpo.getNumberOfChannels()));
- }
-
- if (this.multicastEnabled) {
- this.transitBufferPool.setDesignatedNumberOfBuffers((int) Math.ceil(buffersPerChannel
- * NUMBER_OF_CHANNELS_FOR_MULTICAST));
- }
- }
-
- /**
- * Invalidates the entries identified by the given channel IDs from the receiver lookup cache.
- *
- * @param channelIDs
- * the channel IDs identifying the cache entries to invalidate
- */
- public void invalidateLookupCacheEntries(final Set<ChannelID> channelIDs) {
-
- final Iterator<ChannelID> it = channelIDs.iterator();
- while (it.hasNext()) {
-
- this.receiverCache.remove(it.next());
- }
- }
-
- public void reportAsynchronousEvent(final ExecutionVertexID vertexID) {
-
- final LocalBufferPoolOwner lbpo = this.localBufferPoolOwner.get(vertexID);
- if (lbpo == null) {
- System.out.println("Cannot find local buffer pool owner for " + vertexID);
- return;
- }
-
- lbpo.reportAsynchronousEvent();
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/2db78a8d/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/taskmanager/bytebuffered/CanceledChannelSet.java
----------------------------------------------------------------------
diff --git a/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/taskmanager/bytebuffered/CanceledChannelSet.java b/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/taskmanager/bytebuffered/CanceledChannelSet.java
deleted file mode 100644
index f1efdc7..0000000
--- a/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/taskmanager/bytebuffered/CanceledChannelSet.java
+++ /dev/null
@@ -1,211 +0,0 @@
-/***********************************************************************************************************************
- * Copyright (C) 2010-2013 by the Stratosphere project (http://stratosphere.eu)
- *
- * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
- * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations under the License.
- **********************************************************************************************************************/
-
-package eu.stratosphere.nephele.taskmanager.bytebuffered;
-
-import java.util.Collection;
-import java.util.HashMap;
-import java.util.Iterator;
-import java.util.Map;
-import java.util.Set;
-
-import eu.stratosphere.nephele.io.channels.ChannelID;
-
-/**
- * This channel set stores the ID's of all channels whose tasks have been canceled recently. The set is cleaned up by
- * periodically calling the method <code>cleanup</code>.
- * <p>
- * This class is thread-safe.
- *
- */
-public final class CanceledChannelSet implements Set<ChannelID> {
-
- /**
- * The period of time the entries must at least remain in the map.
- */
- private final static long CLEANUP_INTERVAL = 30000; // 30 sec.
-
- /**
- * The map which stores the ID's of the channels whose tasks have been canceled.
- */
- private final Map<ChannelID, Long> canceledChannels = new HashMap<ChannelID, Long>();
-
-
- @Override
- public boolean add(ChannelID arg0) {
-
- final long now = System.currentTimeMillis();
-
- synchronized (this.canceledChannels) {
- if (this.canceledChannels.put(arg0, Long.valueOf(now)) == null) {
- return true;
- }
- }
-
- return false;
- }
-
-
- @Override
- public boolean addAll(Collection<? extends ChannelID> arg0) {
-
- final Long now = Long.valueOf(System.currentTimeMillis());
- final Iterator<? extends ChannelID> it = arg0.iterator();
- boolean retVal = false;
-
- synchronized (this.canceledChannels) {
-
- while (it.hasNext()) {
-
- if (this.canceledChannels.put(it.next(), now) == null) {
- retVal = true;
- }
- }
- }
-
- return retVal;
- }
-
-
- @Override
- public void clear() {
-
- synchronized (this.canceledChannels) {
- this.canceledChannels.clear();
- }
-
- }
-
-
- @Override
- public boolean contains(Object arg0) {
-
- synchronized (this.canceledChannels) {
- return this.canceledChannels.containsKey(arg0);
- }
- }
-
-
- @Override
- public boolean containsAll(Collection<?> arg0) {
-
- synchronized (this.canceledChannels) {
- return this.canceledChannels.keySet().containsAll(arg0);
- }
- }
-
-
- @Override
- public boolean isEmpty() {
-
- synchronized (this.canceledChannels) {
- return this.canceledChannels.isEmpty();
- }
- }
-
-
- @Override
- public Iterator<ChannelID> iterator() {
-
- synchronized (this.canceledChannels) {
- return this.canceledChannels.keySet().iterator();
- }
- }
-
-
- @Override
- public boolean remove(Object arg0) {
-
- synchronized (this.canceledChannels) {
- if (this.canceledChannels.remove(arg0) == null) {
- return false;
- }
- }
-
- return true;
- }
-
-
- @Override
- public boolean removeAll(Collection<?> arg0) {
-
- final Iterator<?> it = arg0.iterator();
- boolean retVal = false;
-
- synchronized (this.canceledChannels) {
-
- while (it.hasNext()) {
- if (this.canceledChannels.remove(it.next()) != null) {
- retVal = true;
- }
- }
- }
-
- return retVal;
- }
-
-
- @Override
- public boolean retainAll(Collection<?> arg0) {
-
- throw new RuntimeException("Method not implemented");
- }
-
-
- @Override
- public int size() {
-
- synchronized (this.canceledChannels) {
- return this.canceledChannels.size();
- }
- }
-
-
- @Override
- public Object[] toArray() {
-
- synchronized (this.canceledChannels) {
- return this.canceledChannels.keySet().toArray();
- }
- }
-
-
- @Override
- public <T> T[] toArray(T[] arg0) {
-
- synchronized (this.canceledChannels) {
- return this.canceledChannels.keySet().toArray(arg0);
- }
- }
-
- /**
- * Removes all entries from the set which have been added longer than <code>CLEANUP_INTERVAL</code> milliseconds
- * ago.
- */
- public void cleanup() {
-
- final long now = System.currentTimeMillis();
-
- synchronized (this.canceledChannels) {
-
- final Iterator<Map.Entry<ChannelID, Long>> it = this.canceledChannels.entrySet().iterator();
- while (it.hasNext()) {
-
- final Map.Entry<ChannelID, Long> entry = it.next();
- if ((entry.getValue().longValue() + CLEANUP_INTERVAL) < now) {
- it.remove();
- }
- }
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/2db78a8d/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/taskmanager/bytebuffered/ChannelContext.java
----------------------------------------------------------------------
diff --git a/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/taskmanager/bytebuffered/ChannelContext.java b/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/taskmanager/bytebuffered/ChannelContext.java
deleted file mode 100644
index b3a9200..0000000
--- a/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/taskmanager/bytebuffered/ChannelContext.java
+++ /dev/null
@@ -1,36 +0,0 @@
-/***********************************************************************************************************************
- * Copyright (C) 2010-2013 by the Stratosphere project (http://stratosphere.eu)
- *
- * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
- * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations under the License.
- **********************************************************************************************************************/
-
-package eu.stratosphere.nephele.taskmanager.bytebuffered;
-
-import eu.stratosphere.nephele.io.channels.ChannelID;
-import eu.stratosphere.nephele.io.channels.ChannelType;
-import eu.stratosphere.nephele.jobgraph.JobID;
-import eu.stratosphere.nephele.taskmanager.transferenvelope.TransferEnvelope;
-
-public interface ChannelContext {
-
- boolean isInputChannel();
-
- JobID getJobID();
-
- ChannelID getChannelID();
-
- ChannelID getConnectedChannelID();
-
- ChannelType getType();
-
- void queueTransferEnvelope(TransferEnvelope transferEnvelope);
-
- void destroy();
-}
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/2db78a8d/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/taskmanager/bytebuffered/ConnectionInfoLookupResponse.java
----------------------------------------------------------------------
diff --git a/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/taskmanager/bytebuffered/ConnectionInfoLookupResponse.java b/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/taskmanager/bytebuffered/ConnectionInfoLookupResponse.java
deleted file mode 100644
index 69daa0e..0000000
--- a/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/taskmanager/bytebuffered/ConnectionInfoLookupResponse.java
+++ /dev/null
@@ -1,176 +0,0 @@
-/***********************************************************************************************************************
- *
- * Copyright (C) 2010-2013 by the Stratosphere project (http://stratosphere.eu)
- *
- * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
- * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations under the License.
- *
-
- **********************************************************************************************************************/
-
-package eu.stratosphere.nephele.taskmanager.bytebuffered;
-
-import java.io.DataInput;
-import java.io.DataOutput;
-import java.io.IOException;
-import java.util.List;
-
-import eu.stratosphere.core.io.IOReadableWritable;
-import eu.stratosphere.nephele.io.channels.ChannelID;
-import eu.stratosphere.nephele.util.EnumUtils;
-import eu.stratosphere.nephele.util.SerializableArrayList;
-
-public class ConnectionInfoLookupResponse implements IOReadableWritable {
-
- private enum ReturnCode {
- NOT_FOUND, FOUND_AND_RECEIVER_READY, FOUND_BUT_RECEIVER_NOT_READY, JOB_IS_ABORTING
- };
-
- // was request successful?
- private ReturnCode returnCode;
-
- /**
- * Contains next-hop instances, this instance must forward multicast transmissions to.
- */
- private final SerializableArrayList<RemoteReceiver> remoteTargets = new SerializableArrayList<RemoteReceiver>();
-
- /**
- * Contains local ChannelIDs, multicast packets must be forwarded to.
- */
- private final SerializableArrayList<ChannelID> localTargets = new SerializableArrayList<ChannelID>();
-
- public ConnectionInfoLookupResponse() {
- this.returnCode = ReturnCode.NOT_FOUND;
- }
-
- public void addRemoteTarget(final RemoteReceiver remote) {
- this.remoteTargets.add(remote);
- }
-
- public void addLocalTarget(ChannelID local) {
- this.localTargets.add(local);
- }
-
- private void setReturnCode(ReturnCode code) {
- this.returnCode = code;
- }
-
- public List<RemoteReceiver> getRemoteTargets() {
- return this.remoteTargets;
- }
-
- public List<ChannelID> getLocalTargets() {
- return this.localTargets;
- }
-
- @Override
- public void read(DataInput in) throws IOException {
-
- this.localTargets.read(in);
- this.remoteTargets.read(in);
-
- this.returnCode = EnumUtils.readEnum(in, ReturnCode.class);
- }
-
- @Override
- public void write(DataOutput out) throws IOException {
-
- this.localTargets.write(out);
- this.remoteTargets.write(out);
-
- EnumUtils.writeEnum(out, this.returnCode);
-
- }
-
- public boolean receiverNotFound() {
-
- return (this.returnCode == ReturnCode.NOT_FOUND);
- }
-
- public boolean receiverNotReady() {
-
- return (this.returnCode == ReturnCode.FOUND_BUT_RECEIVER_NOT_READY);
- }
-
- public boolean receiverReady() {
-
- return (this.returnCode == ReturnCode.FOUND_AND_RECEIVER_READY);
- }
-
- public boolean isJobAborting() {
-
- return (this.returnCode == ReturnCode.JOB_IS_ABORTING);
- }
-
- public static ConnectionInfoLookupResponse createReceiverFoundAndReady(final ChannelID targetChannelID) {
-
- final ConnectionInfoLookupResponse response = new ConnectionInfoLookupResponse();
- response.setReturnCode(ReturnCode.FOUND_AND_RECEIVER_READY);
- response.addLocalTarget(targetChannelID);
-
- return response;
- }
-
- public static ConnectionInfoLookupResponse createReceiverFoundAndReady(final RemoteReceiver remoteReceiver) {
-
- final ConnectionInfoLookupResponse response = new ConnectionInfoLookupResponse();
- response.setReturnCode(ReturnCode.FOUND_AND_RECEIVER_READY);
- response.addRemoteTarget(remoteReceiver);
-
- return response;
- }
-
- /**
- * Constructor used to generate a plain ConnectionInfoLookupResponse object to be filled with multicast targets.
- *
- * @return
- */
- public static ConnectionInfoLookupResponse createReceiverFoundAndReady() {
-
- final ConnectionInfoLookupResponse response = new ConnectionInfoLookupResponse();
- response.setReturnCode(ReturnCode.FOUND_AND_RECEIVER_READY);
-
- return response;
- }
-
- public static ConnectionInfoLookupResponse createReceiverNotFound() {
- final ConnectionInfoLookupResponse response = new ConnectionInfoLookupResponse();
- response.setReturnCode(ReturnCode.NOT_FOUND);
-
- return response;
- }
-
- public static ConnectionInfoLookupResponse createReceiverNotReady() {
- final ConnectionInfoLookupResponse response = new ConnectionInfoLookupResponse();
- response.setReturnCode(ReturnCode.FOUND_BUT_RECEIVER_NOT_READY);
-
- return response;
- }
-
- public static ConnectionInfoLookupResponse createJobIsAborting() {
- final ConnectionInfoLookupResponse response = new ConnectionInfoLookupResponse();
- response.setReturnCode(ReturnCode.JOB_IS_ABORTING);
-
- return response;
- }
-
- @Override
- public String toString() {
- StringBuilder returnstring = new StringBuilder();
- returnstring.append("local targets (total: " + this.localTargets.size() + "):\n");
- for (ChannelID i : this.localTargets) {
- returnstring.append(i + "\n");
- }
- returnstring.append("remote targets: (total: " + this.remoteTargets.size() + "):\n");
- for (final RemoteReceiver rr : this.remoteTargets) {
- returnstring.append(rr + "\n");
- }
- return returnstring.toString();
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/2db78a8d/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/taskmanager/bytebuffered/GateContext.java
----------------------------------------------------------------------
diff --git a/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/taskmanager/bytebuffered/GateContext.java b/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/taskmanager/bytebuffered/GateContext.java
deleted file mode 100644
index 7b8ce36..0000000
--- a/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/taskmanager/bytebuffered/GateContext.java
+++ /dev/null
@@ -1,21 +0,0 @@
-/***********************************************************************************************************************
- * Copyright (C) 2010-2013 by the Stratosphere project (http://stratosphere.eu)
- *
- * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
- * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations under the License.
- **********************************************************************************************************************/
-
-package eu.stratosphere.nephele.taskmanager.bytebuffered;
-
-import eu.stratosphere.nephele.io.GateID;
-
-public interface GateContext {
-
- GateID getGateID();
-}