You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by rm...@apache.org on 2014/06/10 21:35:18 UTC
[21/34] Offer buffer-oriented API for I/O (#25)
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/352c1b99/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/io/channels/LocalChannelWithAccessInfo.java
----------------------------------------------------------------------
diff --git a/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/io/channels/LocalChannelWithAccessInfo.java b/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/io/channels/LocalChannelWithAccessInfo.java
deleted file mode 100644
index 173021f..0000000
--- a/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/io/channels/LocalChannelWithAccessInfo.java
+++ /dev/null
@@ -1,170 +0,0 @@
-/***********************************************************************************************************************
- * Copyright (C) 2010-2013 by the Stratosphere project (http://stratosphere.eu)
- *
- * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
- * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations under the License.
- **********************************************************************************************************************/
-
-package eu.stratosphere.nephele.io.channels;
-
-import java.io.File;
-import java.io.IOException;
-import java.io.RandomAccessFile;
-import java.nio.channels.FileChannel;
-import java.util.concurrent.atomic.AtomicBoolean;
-import java.util.concurrent.atomic.AtomicInteger;
-import java.util.concurrent.atomic.AtomicLong;
-
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-
-final class LocalChannelWithAccessInfo implements ChannelWithAccessInfo {
-
- /**
- * The logging object.
- */
- private static final Log LOG = LogFactory.getLog(LocalChannelWithAccessInfo.class);
-
- private final File file;
-
- private final FileChannel channel;
-
- private final AtomicLong reservedWritePosition;
-
- private final AtomicInteger referenceCounter;
-
- private final AtomicBoolean deleteOnClose;
-
- LocalChannelWithAccessInfo(final File file, final boolean deleteOnClose) throws IOException {
-
- this.file = file;
- this.channel = new RandomAccessFile(file, "rw").getChannel();
- this.reservedWritePosition = new AtomicLong(0L);
- this.referenceCounter = new AtomicInteger(0);
- this.deleteOnClose = new AtomicBoolean(deleteOnClose);
- }
-
-
- @Override
- public FileChannel getChannel() {
-
- return this.channel;
- }
-
-
- @Override
- public FileChannel getAndIncrementReferences() {
-
- if (incrementReferences()) {
- return this.channel;
- } else {
- return null;
- }
- }
-
- @Override
- public ChannelWithPosition reserveWriteSpaceAndIncrementReferences(final int spaceToReserve) {
-
- if (incrementReferences()) {
- return new ChannelWithPosition(this.channel, this.reservedWritePosition.getAndAdd(spaceToReserve));
- } else {
- return null;
- }
- }
-
-
- @Override
- public int decrementReferences() {
-
- int current = this.referenceCounter.get();
- while (true) {
- if (current <= 0) {
- // this is actually an error case, because the channel was deleted before
- throw new IllegalStateException("The references to the file were already at zero.");
- }
-
- if (current == 1) {
- // this call decrements to zero, so mark it as deleted
- if (this.referenceCounter.compareAndSet(current, Integer.MIN_VALUE)) {
- current = 0;
- break;
- }
- } else if (this.referenceCounter.compareAndSet(current, current - 1)) {
- current = current - 1;
- break;
- }
- current = this.referenceCounter.get();
- }
-
- if (current > 0) {
- return current;
- } else if (current == 0) {
- // delete the channel
- this.referenceCounter.set(Integer.MIN_VALUE);
- this.reservedWritePosition.set(Long.MIN_VALUE);
- try {
- this.channel.close();
- } catch (IOException ioex) {
- if (LOG.isErrorEnabled()) {
- LOG.error("Error while closing spill file for file buffers: " + ioex.getMessage(), ioex);
- }
- }
- if (this.deleteOnClose.get()) {
- this.file.delete();
- }
- return current;
- } else {
- throw new IllegalStateException("The references to the file were already at zero.");
- }
- }
-
-
- @Override
- public boolean incrementReferences() {
-
- int current = this.referenceCounter.get();
- while (true) {
- // check whether it was disposed in the meantime
- if (current < 0) {
- return false;
- }
- // atomically check and increment
- if (this.referenceCounter.compareAndSet(current, current + 1)) {
- return true;
- }
- current = this.referenceCounter.get();
- }
- }
-
-
- @Override
- public void disposeSilently() {
-
- this.referenceCounter.set(Integer.MIN_VALUE);
- this.reservedWritePosition.set(Long.MIN_VALUE);
-
- if (this.channel.isOpen()) {
- try {
- this.channel.close();
- } catch (Throwable t) {
- }
- }
-
- if (this.deleteOnClose.get()) {
- this.file.delete();
- }
- }
-
-
- @Override
- public void updateDeleteOnCloseFlag(final boolean deleteOnClose) {
-
- this.deleteOnClose.compareAndSet(true, deleteOnClose);
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/352c1b99/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/io/channels/MemoryBuffer.java
----------------------------------------------------------------------
diff --git a/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/io/channels/MemoryBuffer.java b/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/io/channels/MemoryBuffer.java
deleted file mode 100644
index 8b2c0fb..0000000
--- a/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/io/channels/MemoryBuffer.java
+++ /dev/null
@@ -1,249 +0,0 @@
-/***********************************************************************************************************************
- * Copyright (C) 2010-2013 by the Stratosphere project (http://stratosphere.eu)
- *
- * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
- * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations under the License.
- **********************************************************************************************************************/
-
-package eu.stratosphere.nephele.io.channels;
-
-import java.io.IOException;
-import java.nio.ByteBuffer;
-import java.nio.channels.ReadableByteChannel;
-import java.nio.channels.WritableByteChannel;
-
-import eu.stratosphere.core.memory.MemorySegment;
-
-public final class MemoryBuffer extends Buffer {
-
- private final MemoryBufferRecycler bufferRecycler;
-
- private final MemorySegment internalMemorySegment;
-
- /**
- * Internal index that points to the next byte to write
- */
- private int index = 0;
-
- /**
- * Internal limit to simulate ByteBuffer behavior of MemorySegment. index > limit is not allowed.
- */
- private int limit = 0;
-
- MemoryBuffer(final int bufferSize, final MemorySegment memory, final MemoryBufferPoolConnector bufferPoolConnector) {
- if (bufferSize > memory.size()) {
- throw new IllegalArgumentException("Requested segment size is " + bufferSize
- + ", but provided MemorySegment only has a capacity of " + memory.size());
- }
-
- this.bufferRecycler = new MemoryBufferRecycler(memory, bufferPoolConnector);
- this.internalMemorySegment = memory;
- this.position(0);
- this.limit(bufferSize);
- }
-
- private MemoryBuffer(final int bufferSize, final int pos, final MemorySegment memory, final MemoryBufferRecycler bufferRecycler) {
- this.bufferRecycler = bufferRecycler;
- this.internalMemorySegment = memory;
- this.position(pos);
- this.limit(bufferSize);
- }
-
- @Override
- public int read(ByteBuffer dst) throws IOException {
-
- if (!this.hasRemaining()) {
- return -1;
- }
- int numBytes = dst.remaining();
- final int remaining = this.remaining();
- if (numBytes == 0) {
- return 0;
- }
- if(numBytes > remaining) {
- numBytes = remaining;
- }
- internalMemorySegment.get(position(), dst, numBytes);
- index += numBytes;
- return numBytes;
- }
-
-
- @Override
- public int writeTo(WritableByteChannel writableByteChannel) throws IOException {
- if (!this.hasRemaining()) {
- return -1;
- }
-
- final ByteBuffer wrapped = this.internalMemorySegment.wrap(index, limit-index);
- final int written = writableByteChannel.write(wrapped);
- position(wrapped.position());
- return written;
- }
-
-
- @Override
- public void close() throws IOException {
-
- this.position(this.limit());
- }
-
-
- @Override
- public boolean isOpen() {
-
- return this.hasRemaining();
- }
-
- /**
- * Resets the memory buffer.
- *
- * @param bufferSize
- * the size of buffer in bytes after the reset
- */
- public final void reset(final int bufferSize) {
- if(bufferSize > this.internalMemorySegment.size()) {
- throw new RuntimeException("Given buffer size exceeds underlying buffer size");
- }
- this.position(0);
- this.limit(bufferSize);
- }
-
- public final void position(final int i) {
- if(i > limit) {
- throw new IndexOutOfBoundsException("new position is larger than the limit");
- }
- index = i;
- }
-
- @Override
- public final int position() {
- return index;
- }
-
- public final void limit(final int l) {
- if(limit > internalMemorySegment.size()) {
- throw new RuntimeException("Limit is larger than MemoryBuffer size");
- }
- if (index > limit) {
- index = limit;
- }
- limit = l;
- }
-
- public final int limit() {
- return limit;
- }
-
- public final boolean hasRemaining() {
- return index < limit;
- }
-
- public final int remaining() {
- return limit - index;
- }
-
- /**
- * Put MemoryBuffer into read mode
- */
- public final void flip() {
- limit = position();
- position(0);
- }
-
- public void clear() {
- this.limit = getTotalSize();
- this.position(0);
- }
-
- /**
- *
- * @return Returns the size of the underlying MemorySegment
- */
- public int getTotalSize() {
- return this.internalMemorySegment.size();
- }
-
- @Override
- public final int size() {
- return this.limit();
- }
-
- public MemorySegment getMemorySegment() {
- return this.internalMemorySegment;
- }
-
-
- @Override
- protected void recycle() {
- this.bufferRecycler.decreaseReferenceCounter();
- if(bufferRecycler.referenceCounter.get() == 0) {
- clear();
- }
- }
-
-
-
- @Override
- public boolean isBackedByMemory() {
- return true;
- }
-
-
- @Override
- public MemoryBuffer duplicate() {
- final MemoryBuffer duplicatedMemoryBuffer = new MemoryBuffer(this.limit(), this.position(), this.internalMemorySegment, this.bufferRecycler);
- this.bufferRecycler.increaseReferenceCounter();
- return duplicatedMemoryBuffer;
- }
-
-
- @Override
- public void copyToBuffer(final Buffer destinationBuffer) throws IOException {
- if (size() > destinationBuffer.size()) {
- throw new IllegalArgumentException("Destination buffer is too small to store content of source buffer: "
- + size() + " vs. " + destinationBuffer.size());
- }
- final MemoryBuffer target = (MemoryBuffer) destinationBuffer;
- this.internalMemorySegment.copyTo(this.position(), target.getMemorySegment(), destinationBuffer.position(), limit()-position());
- target.position(limit()-position()); // even if we do not change the source (this), we change the destination!!
- destinationBuffer.flip();
- }
-
-
-
- @Override
- public int write(final ByteBuffer src) throws IOException {
- int numBytes = src.remaining();
- final int thisRemaining = this.remaining();
- if(thisRemaining == 0) {
- return 0;
- }
- if(numBytes > thisRemaining) {
- numBytes = thisRemaining;
- }
- this.internalMemorySegment.put(position(), src, numBytes);
- this.index += numBytes;
- return numBytes;
- }
-
-
- @Override
- public int write(final ReadableByteChannel readableByteChannel) throws IOException {
-
- if (!this.hasRemaining()) {
- return 0;
- }
- ByteBuffer wrapper = this.internalMemorySegment.wrap(index, limit-index);
- final int written = readableByteChannel.read(wrapper);
- this.position(wrapper.position());
- this.limit(wrapper.limit());
- return written;
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/352c1b99/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/io/channels/MemoryBufferPoolConnector.java
----------------------------------------------------------------------
diff --git a/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/io/channels/MemoryBufferPoolConnector.java b/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/io/channels/MemoryBufferPoolConnector.java
deleted file mode 100644
index bd8519d..0000000
--- a/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/io/channels/MemoryBufferPoolConnector.java
+++ /dev/null
@@ -1,32 +0,0 @@
-/***********************************************************************************************************************
- * Copyright (C) 2010-2013 by the Stratosphere project (http://stratosphere.eu)
- *
- * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
- * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations under the License.
- **********************************************************************************************************************/
-
-package eu.stratosphere.nephele.io.channels;
-
-import eu.stratosphere.core.memory.MemorySegment;
-
-/**
- * The memory buffer pool connector provides a connection between {@link MemoryBuffer} and the {@link LocalBufferPool}
- * the memory buffer's encapsulated byte buffer has originally been taken from.
- *
- */
-public interface MemoryBufferPoolConnector {
-
- /**
- * Called by the {@link MemoryBufferRecycler} to return a buffer to its original buffer pool.
- *
- * @param byteBuffer
- * the buffer to be recycled
- */
- void recycle(MemorySegment memSeg);
-}
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/352c1b99/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/io/channels/MemoryBufferRecycler.java
----------------------------------------------------------------------
diff --git a/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/io/channels/MemoryBufferRecycler.java b/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/io/channels/MemoryBufferRecycler.java
deleted file mode 100644
index 141e3a1..0000000
--- a/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/io/channels/MemoryBufferRecycler.java
+++ /dev/null
@@ -1,91 +0,0 @@
-/***********************************************************************************************************************
- * Copyright (C) 2010-2013 by the Stratosphere project (http://stratosphere.eu)
- *
- * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
- * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations under the License.
- **********************************************************************************************************************/
-
-package eu.stratosphere.nephele.io.channels;
-
-import java.util.concurrent.atomic.AtomicInteger;
-
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-
-import eu.stratosphere.core.memory.MemorySegment;
-
-/**
- * A memory buffer recycler takes care of the correct recycling of the internal byte buffer which backs a memory buffer.
- * Since buffer objects can be duplicated, i.e. multiple buffer objects point to the same physical buffer, it is
- * necessary to coordinate the recycling of the physical buffer.
- * <p>
- * This class is thread-safe.
- *
- */
-public final class MemoryBufferRecycler {
-
- /**
- * The log object used to report debug information and possible errors.
- */
- private static final Log LOG = LogFactory.getLog(MemoryBufferRecycler.class);
-
- /**
- * The original memory segment which has been taken from byte buffered channel manager's buffer pool.
- */
- private final MemorySegment originalSegment;
-
- /**
- * The connection to the pool from which the byte buffer has originally been taken.
- */
- private final MemoryBufferPoolConnector bufferPoolConnector;
-
- /**
- * The number of memory buffer objects which may still access the physical buffer.
- */
- public final AtomicInteger referenceCounter = new AtomicInteger(1);
-
- /**
- * Constructs a new memory buffer recycler.
- *
- * @param originalBuffer
- * the original byte buffer
- * @param bufferPoolConnector
- * the connection to the pool from which the byte buffer has originally been taken
- */
- MemoryBufferRecycler(final MemorySegment originalSegment, final MemoryBufferPoolConnector bufferPoolConnector) {
-
- this.originalSegment = originalSegment;
- this.bufferPoolConnector = bufferPoolConnector;
- }
-
- /**
- * Increases the number of references to the physical buffer by one.
- */
- void increaseReferenceCounter() {
-
- if (this.referenceCounter.getAndIncrement() == 0) {
- LOG.error("Increasing reference counter from 0 to 1");
- }
- }
-
- /**
- * Decreases the number of references to the physical buffer by one. If the number of references becomes zero the
- * physical buffer is recycled.
- */
- void decreaseReferenceCounter() {
-
- final int val = this.referenceCounter.decrementAndGet();
- if (val == 0) {
- this.bufferPoolConnector.recycle(this.originalSegment);
-
- } else if (val < 0) {
- LOG.error("reference counter is negative");
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/352c1b99/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/io/channels/SerializationBuffer.java
----------------------------------------------------------------------
diff --git a/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/io/channels/SerializationBuffer.java b/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/io/channels/SerializationBuffer.java
deleted file mode 100644
index 55bcb56..0000000
--- a/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/io/channels/SerializationBuffer.java
+++ /dev/null
@@ -1,141 +0,0 @@
-/***********************************************************************************************************************
- * Copyright (C) 2010-2013 by the Stratosphere project (http://stratosphere.eu)
- *
- * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
- * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations under the License.
- **********************************************************************************************************************/
-
-package eu.stratosphere.nephele.io.channels;
-
-import java.io.IOException;
-import java.nio.ByteBuffer;
-import java.nio.channels.WritableByteChannel;
-
-import eu.stratosphere.core.io.IOReadableWritable;
-import eu.stratosphere.nephele.io.DataOutputBuffer;
-
-/**
- * A class for serializing a record to its binary representation.
- *
- * @param <T>
- * the type of the record this serialization buffer can be used for
- */
-public class SerializationBuffer<T extends IOReadableWritable> {
-
- private static final int SIZEOFINT = 4;
-
- private DataOutputBuffer serializationBuffer = new DataOutputBuffer();
-
- private ByteBuffer lengthBuf = ByteBuffer.allocate(SIZEOFINT);
-
- private int bytesReadFromBuffer = 0;
-
- /**
- * Translates an integer into an array of bytes.
- *
- * @param val
- * The integer to be translated
- * @param arr
- * The byte buffer to store the data of the integer
- */
- private void integerToByteBuffer(final int val, final ByteBuffer byteBuffer) {
-
- for (int i = 0; i < SIZEOFINT; ++i) {
- final int shift = i << (SIZEOFINT - 1); // i * 8
- byteBuffer.put(SIZEOFINT - 1 - i, (byte) ((val & (0xff << shift)) >>> shift));
- }
-
- byteBuffer.position(0);
- byteBuffer.limit(SIZEOFINT);
- }
-
- /**
- * Return <code>true</code> if the internal serialization buffer still contains data.
- * In this case the method serialize must not be called. If the internal buffer is empty
- * the method return <code>false</code>
- *
- * @return <code>true</code> if the internal serialization buffer still contains data, <code>false</code> it it is
- * empty
- */
- public boolean dataLeftFromPreviousSerialization() {
- return leftInSerializationBuffer() > 0;
- }
-
- /**
- * Reads the internal serialization buffer and writes the data to the given {@link WritableByteChannel} byte
- * channel.
- *
- * @param writableByteChannel
- * the byte channel to write the serialized data to
- * @return the number of bytes written the to given byte channel
- * @throws IOException
- * thrown if an error occurs while writing to serialized data to the channel
- */
- public int read(final WritableByteChannel writableByteChannel) throws IOException {
-
- int bytesReadFromLengthBuf = 0;
-
- // Deal with length buffer first
- if (this.lengthBuf.hasRemaining()) { // There is data from the length buffer to be written
- bytesReadFromLengthBuf = writableByteChannel.write(this.lengthBuf);
- }
-
- final int bytesReadFromSerializationBuf = writableByteChannel.write(this.serializationBuffer.getData());
- // byteBuffer.put(this.serializationBuffer.getData(), this.bytesReadFromBuffer, length);
- this.bytesReadFromBuffer += bytesReadFromSerializationBuf;
-
- if (leftInSerializationBuffer() == 0) { // Record is entirely written to byteBuffer
- this.serializationBuffer.reset();
- this.bytesReadFromBuffer = 0;
- }
-
- return (bytesReadFromSerializationBuf + bytesReadFromLengthBuf);
- }
-
- /**
- * Return the number of bytes that have not been read from the internal serialization
- * buffer so far.
- *
- * @return the number of bytes that have not been read from the internal serialization buffer so far
- */
- private int leftInSerializationBuffer() {
-
- return (this.serializationBuffer.getLength() - this.bytesReadFromBuffer);
- }
-
- /**
- * Serializes the record and writes it to an internal buffer. The buffer grows dynamically
- * in case more memory is required to serialization.
- *
- * @param record
- * The record to the serialized
- * @throws IOException
- * Thrown if data from a previous serialization process is still in the internal buffer and has not yet been
- * transfered to a byte buffer
- */
- public void serialize(final T record) throws IOException {
-
- // Check if there is data left in the buffer
- if (dataLeftFromPreviousSerialization()) {
- throw new IOException("Cannot write new data, " + leftInSerializationBuffer()
- + " bytes still left from previous call");
- }
-
- record.write(this.serializationBuffer); // serializationBuffer grows dynamically
-
- // Now record is completely in serializationBuffer;
- integerToByteBuffer(this.serializationBuffer.getLength(), this.lengthBuf);
- }
-
- public void clear() {
- this.bytesReadFromBuffer = 0;
- this.lengthBuf.clear();
- this.serializationBuffer.reset();
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/352c1b99/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/io/channels/bytebuffered/AbstractByteBufferedInputChannel.java
----------------------------------------------------------------------
diff --git a/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/io/channels/bytebuffered/AbstractByteBufferedInputChannel.java b/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/io/channels/bytebuffered/AbstractByteBufferedInputChannel.java
deleted file mode 100644
index 0009d5a..0000000
--- a/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/io/channels/bytebuffered/AbstractByteBufferedInputChannel.java
+++ /dev/null
@@ -1,243 +0,0 @@
-/***********************************************************************************************************************
- * Copyright (C) 2010-2013 by the Stratosphere project (http://stratosphere.eu)
- *
- * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
- * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations under the License.
- **********************************************************************************************************************/
-
-package eu.stratosphere.nephele.io.channels.bytebuffered;
-
-import java.io.IOException;
-
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-
-import eu.stratosphere.core.io.IOReadableWritable;
-import eu.stratosphere.nephele.event.task.AbstractEvent;
-import eu.stratosphere.nephele.event.task.AbstractTaskEvent;
-import eu.stratosphere.nephele.io.InputChannelResult;
-import eu.stratosphere.nephele.io.InputGate;
-import eu.stratosphere.nephele.io.RecordDeserializer;
-import eu.stratosphere.nephele.io.channels.AbstractInputChannel;
-import eu.stratosphere.nephele.io.channels.Buffer;
-import eu.stratosphere.nephele.io.channels.ChannelID;
-
-/**
- * @param <T> The type of record that can be transported through this channel.
- */
-public abstract class AbstractByteBufferedInputChannel<T extends IOReadableWritable> extends AbstractInputChannel<T> {
-
- /**
- * The log object used to report warnings and errors.
- */
- private static final Log LOG = LogFactory.getLog(AbstractByteBufferedInputChannel.class);
-
- /**
- * The deserializer used to deserialize records.
- */
- private final RecordDeserializer<T> deserializer;
-
- /**
- * Buffer for the uncompressed (raw) data.
- */
- private Buffer dataBuffer;
-
- private ByteBufferedInputChannelBroker inputChannelBroker;
-
- private AbstractTaskEvent currentEvent;
-
- /**
- * The exception observed in this channel while processing the buffers. Checked and thrown
- * per-buffer.
- */
- private volatile IOException ioException;
-
- /**
- * Stores the number of bytes read through this input channel since its instantiation.
- */
- private long amountOfDataTransmitted;
-
-
- private volatile boolean brokerAggreedToCloseChannel;
-
- /**
- * Creates a new input channel.
- *
- * @param inputGate
- * the input gate this channel is wired to
- * @param channelIndex
- * the channel's index at the associated input gate
- * @param type
- * the type of record transported through this channel
- * @param channelID
- * the ID of the channel
- * @param connectedChannelID
- * the ID of the channel this channel is connected to
- */
- public AbstractByteBufferedInputChannel(final InputGate<T> inputGate, final int channelIndex,
- final RecordDeserializer<T> deserializer, final ChannelID channelID, final ChannelID connectedChannelID) {
- super(inputGate, channelIndex, channelID, connectedChannelID);
- this.deserializer = deserializer;
- }
-
- @Override
- public InputChannelResult readRecord(T target) throws IOException {
- if (this.dataBuffer == null) {
- if (isClosed()) {
- return InputChannelResult.END_OF_STREAM;
- }
-
- // get the next element we need to handle (buffer or event)
- BufferOrEvent boe = this.inputChannelBroker.getNextBufferOrEvent();
-
- if (boe == null) {
- throw new IllegalStateException("Input channel was queries for data even though none was announced available.");
- }
-
- // handle events
- if (boe.isEvent())
- {
- // sanity check: an event may only come after a complete record.
- if (this.deserializer.hasUnfinishedData()) {
- throw new IOException("Channel received an event before completing the current partial record.");
- }
-
- AbstractEvent evt = boe.getEvent();
- if (evt.getClass() == ByteBufferedChannelCloseEvent.class) {
- this.brokerAggreedToCloseChannel = true;
- return InputChannelResult.END_OF_STREAM;
- }
- else if (evt.getClass() == EndOfSuperstepEvent.class) {
- return InputChannelResult.END_OF_SUPERSTEP;
- }
- else if (evt instanceof AbstractTaskEvent) {
- this.currentEvent = (AbstractTaskEvent) evt;
- return InputChannelResult.TASK_EVENT;
- }
- else {
- LOG.error("Received unknown event: " + evt);
- return InputChannelResult.NONE;
- }
- } else {
- // buffer case
- this.dataBuffer = boe.getBuffer();
- }
- }
-
- // get the next record form the buffer
- T nextRecord = this.deserializer.readData(target, this.dataBuffer);
-
- // release the buffer if it is empty
- if (this.dataBuffer.remaining() == 0) {
- releasedConsumedReadBuffer(this.dataBuffer);
- this.dataBuffer = null;
- return nextRecord == null ? InputChannelResult.NONE : InputChannelResult.LAST_RECORD_FROM_BUFFER;
- } else {
- return nextRecord == null ? InputChannelResult.NONE : InputChannelResult.INTERMEDIATE_RECORD_FROM_BUFFER;
- }
- }
-
- @Override
- public boolean isClosed() throws IOException{
- if (this.ioException != null) {
- throw new IOException("An error occurred in the channel: " + this.ioException.getMessage(), this.ioException);
- } else {
- return this.brokerAggreedToCloseChannel;
- }
- }
-
-
- @Override
- public void close() throws IOException, InterruptedException {
-
- this.deserializer.clear();
- if (this.dataBuffer != null) {
- releasedConsumedReadBuffer(this.dataBuffer);
- this.dataBuffer = null;
- }
-
- // This code fragment makes sure the isClosed method works in case the channel input has not been fully consumed
- while (!this.brokerAggreedToCloseChannel)
- {
- BufferOrEvent next = this.inputChannelBroker.getNextBufferOrEvent();
- if (next != null) {
- if (next.isEvent()) {
- if (next.getEvent() instanceof ByteBufferedChannelCloseEvent) {
- this.brokerAggreedToCloseChannel = true;
- }
- } else {
- releasedConsumedReadBuffer(next.getBuffer());
- }
- } else {
- Thread.sleep(200);
- }
- }
-
- // Send close event to indicate the input channel has successfully
- // processed all data it is interested in.
- transferEvent(new ByteBufferedChannelCloseEvent());
- }
-
-
- private void releasedConsumedReadBuffer(Buffer buffer) {
- this.amountOfDataTransmitted += buffer.size();
- buffer.recycleBuffer();
- }
-
-
- public void setInputChannelBroker(ByteBufferedInputChannelBroker inputChannelBroker) {
- this.inputChannelBroker = inputChannelBroker;
- }
-
-
- public void notifyGateThatInputIsAvailable() {
- this.getInputGate().notifyRecordIsAvailable(getChannelIndex());
- }
-
-
- @Override
- public void transferEvent(AbstractEvent event) throws IOException, InterruptedException {
- this.inputChannelBroker.transferEventToOutputChannel(event);
- }
-
-
- public void reportIOException(IOException ioe) {
- this.ioException = ioe;
- }
-
-
- @Override
- public void releaseAllResources() {
- this.brokerAggreedToCloseChannel = true;
- this.deserializer.clear();
-
- // The buffers are recycled by the input channel wrapper
- }
-
-
- @Override
- public long getAmountOfDataTransmitted() {
- return this.amountOfDataTransmitted;
- }
-
-
- /**
- * Notify the channel that a data unit has been consumed.
- */
- public void notifyDataUnitConsumed() {
- this.getInputGate().notifyDataUnitConsumed(getChannelIndex());
- }
-
- @Override
- public AbstractTaskEvent getCurrentEvent() {
- AbstractTaskEvent e = this.currentEvent;
- this.currentEvent = null;
- return e;
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/352c1b99/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/io/channels/bytebuffered/AbstractByteBufferedOutputChannel.java
----------------------------------------------------------------------
diff --git a/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/io/channels/bytebuffered/AbstractByteBufferedOutputChannel.java b/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/io/channels/bytebuffered/AbstractByteBufferedOutputChannel.java
deleted file mode 100644
index 50d4a50..0000000
--- a/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/io/channels/bytebuffered/AbstractByteBufferedOutputChannel.java
+++ /dev/null
@@ -1,255 +0,0 @@
-/***********************************************************************************************************************
- * Copyright (C) 2010-2013 by the Stratosphere project (http://stratosphere.eu)
- *
- * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
- * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations under the License.
- **********************************************************************************************************************/
-
-package eu.stratosphere.nephele.io.channels.bytebuffered;
-
-import java.io.IOException;
-
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-
-import eu.stratosphere.core.io.IOReadableWritable;
-import eu.stratosphere.nephele.event.task.AbstractEvent;
-import eu.stratosphere.nephele.event.task.AbstractTaskEvent;
-import eu.stratosphere.nephele.io.OutputGate;
-import eu.stratosphere.nephele.io.channels.AbstractOutputChannel;
-import eu.stratosphere.nephele.io.channels.Buffer;
-import eu.stratosphere.nephele.io.channels.ChannelID;
-import eu.stratosphere.nephele.io.channels.ChannelType;
-import eu.stratosphere.nephele.io.channels.SerializationBuffer;
-
-public abstract class AbstractByteBufferedOutputChannel<T extends IOReadableWritable> extends AbstractOutputChannel<T> {
-
- /**
- * The serialization buffer used to serialize records.
- */
- private final SerializationBuffer<T> serializationBuffer = new SerializationBuffer<T>();
-
- /**
- * Buffer for the serialized output data.
- */
- private Buffer dataBuffer = null;
-
- /**
- * Stores whether the channel is requested to be closed.
- */
- private boolean closeRequested = false;
-
- /**
- * The output channel broker the channel should contact to request and release write buffers.
- */
- private ByteBufferedOutputChannelBroker outputChannelBroker = null;
-
-
- /**
- * Stores the number of bytes transmitted through this output channel since its instantiation.
- */
- private long amountOfDataTransmitted = 0L;
-
- private static final Log LOG = LogFactory.getLog(AbstractByteBufferedOutputChannel.class);
-
- /**
- * Creates a new byte buffered output channel.
- *
- * @param outputGate
- * the output gate this channel is wired to
- * @param channelIndex
- * the channel's index at the associated output gate
- * @param channelID
- * the ID of the channel
- * @param connectedChannelID
- * the ID of the channel this channel is connected to
- */
- protected AbstractByteBufferedOutputChannel(final OutputGate<T> outputGate, final int channelIndex,
- final ChannelID channelID, final ChannelID connectedChannelID) {
- super(outputGate, channelIndex, channelID, connectedChannelID);
- }
-
-
- @Override
- public boolean isClosed() throws IOException, InterruptedException {
-
- if (this.closeRequested && this.dataBuffer == null
- && !this.serializationBuffer.dataLeftFromPreviousSerialization()) {
-
- if (!this.outputChannelBroker.hasDataLeftToTransmit()) {
- return true;
- }
- }
-
- return false;
- }
-
-
- @Override
- public void requestClose() throws IOException, InterruptedException {
-
- if (!this.closeRequested) {
- this.closeRequested = true;
- if (this.serializationBuffer.dataLeftFromPreviousSerialization()) {
- // make sure we serialized all data before we send the close event
- flush();
- }
-
- if (getType() == ChannelType.INMEMORY || !isBroadcastChannel() || getChannelIndex() == 0) {
- transferEvent(new ByteBufferedChannelCloseEvent());
- flush();
- }
- }
- }
-
- /**
- * Requests a new write buffer from the framework. This method blocks until the requested buffer is available.
- *
- * @throws InterruptedException
- * thrown if the thread is interrupted while waiting for the buffer
- * @throws IOException
- * thrown if an I/O error occurs while waiting for the buffer
- */
- private void requestWriteBufferFromBroker() throws InterruptedException, IOException {
- if (Thread.interrupted()) {
- throw new InterruptedException();
- }
- this.dataBuffer = this.outputChannelBroker.requestEmptyWriteBuffer();
- }
-
- /**
- * Returns the filled buffer to the framework and triggers further processing.
- *
- * @throws IOException
- * thrown if an I/O error occurs while releasing the buffers
- * @throws InterruptedException
- * thrown if the thread is interrupted while releasing the buffers
- */
- private void releaseWriteBuffer() throws IOException, InterruptedException {
- // Keep track of number of bytes transmitted through this channel
- this.amountOfDataTransmitted += this.dataBuffer.size();
-
- this.outputChannelBroker.releaseWriteBuffer(this.dataBuffer);
- this.dataBuffer = null;
- }
-
-
- @Override
- public void writeRecord(T record) throws IOException, InterruptedException {
-
- // Get a write buffer from the broker
- if (this.dataBuffer == null) {
- requestWriteBufferFromBroker();
- }
-
- if (this.closeRequested) {
- throw new IOException("Channel is aready requested to be closed");
- }
-
- // Check if we can accept new records or if there are still old
- // records to be transmitted
- while (this.serializationBuffer.dataLeftFromPreviousSerialization()) {
-
- this.serializationBuffer.read(this.dataBuffer);
- if (this.dataBuffer.remaining() == 0) {
- releaseWriteBuffer();
- requestWriteBufferFromBroker();
- }
- }
-
- if (this.serializationBuffer.dataLeftFromPreviousSerialization()) {
- throw new IOException("Serialization buffer is expected to be empty!");
- }
-
- this.serializationBuffer.serialize(record);
-
- this.serializationBuffer.read(this.dataBuffer);
- if (this.dataBuffer.remaining() == 0) {
- releaseWriteBuffer();
- }
- }
-
- /**
- * Sets the output channel broker this channel should contact to request and release write buffers.
- *
- * @param byteBufferedOutputChannelBroker
- * the output channel broker the channel should contact to request and release write buffers
- */
- public void setByteBufferedOutputChannelBroker(ByteBufferedOutputChannelBroker byteBufferedOutputChannelBroker) {
-
- this.outputChannelBroker = byteBufferedOutputChannelBroker;
- }
-
-
- public void processEvent(AbstractEvent event) {
-
- if (event instanceof AbstractTaskEvent) {
- getOutputGate().deliverEvent((AbstractTaskEvent) event);
- } else {
- LOG.error("Channel " + getID() + " received unknown event " + event);
- }
- }
-
-
- @Override
- public void transferEvent(AbstractEvent event) throws IOException, InterruptedException {
-
- flush();
- this.outputChannelBroker.transferEventToInputChannel(event);
- }
-
-
- @Override
- public void flush() throws IOException, InterruptedException {
-
- // Get rid of remaining data in the serialization buffer
- while (this.serializationBuffer.dataLeftFromPreviousSerialization()) {
-
- if (this.dataBuffer == null) {
-
- try {
- requestWriteBufferFromBroker();
- } catch (InterruptedException e) {
- LOG.error(e);
- }
- }
- this.serializationBuffer.read(this.dataBuffer);
- if (this.dataBuffer.remaining() == 0) {
- releaseWriteBuffer();
- }
- }
-
- // Get rid of the leased write buffer
- if (this.dataBuffer != null) {
- releaseWriteBuffer();
- }
- }
-
-
- @Override
- public void releaseAllResources() {
-
- // TODO: Reconsider release of broker's resources here
- this.closeRequested = true;
-
- this.serializationBuffer.clear();
-
- if (this.dataBuffer != null) {
- this.dataBuffer.recycleBuffer();
- this.dataBuffer = null;
- }
- }
-
-
- @Override
- public long getAmountOfDataTransmitted() {
-
- return this.amountOfDataTransmitted;
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/352c1b99/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/io/channels/bytebuffered/BufferOrEvent.java
----------------------------------------------------------------------
diff --git a/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/io/channels/bytebuffered/BufferOrEvent.java b/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/io/channels/bytebuffered/BufferOrEvent.java
deleted file mode 100644
index f89c7fb..0000000
--- a/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/io/channels/bytebuffered/BufferOrEvent.java
+++ /dev/null
@@ -1,52 +0,0 @@
-/***********************************************************************************************************************
- * Copyright (C) 2010-2013 by the Stratosphere project (http://stratosphere.eu)
- *
- * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
- * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations under the License.
- **********************************************************************************************************************/
-package eu.stratosphere.nephele.io.channels.bytebuffered;
-
-import eu.stratosphere.nephele.event.task.AbstractEvent;
-import eu.stratosphere.nephele.io.channels.Buffer;
-
-/**
- * Either type for {@link Buffer} and {@link AbstractEvent}.
- */
-public class BufferOrEvent {
-
- private final Buffer buffer;
-
- private final AbstractEvent event;
-
- public BufferOrEvent(Buffer buffer) {
- this.buffer = buffer;
- this.event = null;
- }
-
- public BufferOrEvent(AbstractEvent event) {
- this.buffer = null;
- this.event = event;
- }
-
- public boolean isBuffer() {
- return this.buffer != null;
- }
-
- public boolean isEvent() {
- return this.event != null;
- }
-
- public Buffer getBuffer() {
- return this.buffer;
- }
-
- public AbstractEvent getEvent() {
- return this.event;
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/352c1b99/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/io/channels/bytebuffered/ByteBufferedChannelCloseEvent.java
----------------------------------------------------------------------
diff --git a/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/io/channels/bytebuffered/ByteBufferedChannelCloseEvent.java b/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/io/channels/bytebuffered/ByteBufferedChannelCloseEvent.java
deleted file mode 100644
index f427b70..0000000
--- a/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/io/channels/bytebuffered/ByteBufferedChannelCloseEvent.java
+++ /dev/null
@@ -1,35 +0,0 @@
-/***********************************************************************************************************************
- * Copyright (C) 2010-2013 by the Stratosphere project (http://stratosphere.eu)
- *
- * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
- * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations under the License.
- **********************************************************************************************************************/
-
-package eu.stratosphere.nephele.io.channels.bytebuffered;
-
-import java.io.DataInput;
-import java.io.DataOutput;
-import java.io.IOException;
-
-import eu.stratosphere.nephele.event.task.AbstractEvent;
-
-public class ByteBufferedChannelCloseEvent extends AbstractEvent {
-
- @Override
- public void read(DataInput in) throws IOException {
-
- // Nothing to do here
- }
-
- @Override
- public void write(DataOutput out) throws IOException {
- // Nothing to do here
- }
-
-}
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/352c1b99/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/io/channels/bytebuffered/ByteBufferedInputChannelBroker.java
----------------------------------------------------------------------
diff --git a/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/io/channels/bytebuffered/ByteBufferedInputChannelBroker.java b/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/io/channels/bytebuffered/ByteBufferedInputChannelBroker.java
deleted file mode 100644
index 4d259ec..0000000
--- a/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/io/channels/bytebuffered/ByteBufferedInputChannelBroker.java
+++ /dev/null
@@ -1,36 +0,0 @@
-/***********************************************************************************************************************
- * Copyright (C) 2010-2013 by the Stratosphere project (http://stratosphere.eu)
- *
- * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
- * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations under the License.
- **********************************************************************************************************************/
-
-package eu.stratosphere.nephele.io.channels.bytebuffered;
-
-import java.io.IOException;
-
-import eu.stratosphere.nephele.event.task.AbstractEvent;
-
-
-public interface ByteBufferedInputChannelBroker {
-
- public BufferOrEvent getNextBufferOrEvent() throws IOException;
-
- /**
- * Forwards the given event to the connected network output channel on a best effort basis.
- *
- * @param event
- * the event to be transferred
- * @throws InterruptedException
- * thrown if the thread is interrupted while waiting for the event to be transfered
- * @throws IOException
- * thrown if an I/O error occurs while transferring the event
- */
- void transferEventToOutputChannel(AbstractEvent event) throws IOException, InterruptedException;
-}
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/352c1b99/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/io/channels/bytebuffered/ByteBufferedOutputChannelBroker.java
----------------------------------------------------------------------
diff --git a/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/io/channels/bytebuffered/ByteBufferedOutputChannelBroker.java b/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/io/channels/bytebuffered/ByteBufferedOutputChannelBroker.java
deleted file mode 100644
index af45e4c..0000000
--- a/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/io/channels/bytebuffered/ByteBufferedOutputChannelBroker.java
+++ /dev/null
@@ -1,71 +0,0 @@
-/***********************************************************************************************************************
- * Copyright (C) 2010-2013 by the Stratosphere project (http://stratosphere.eu)
- *
- * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
- * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations under the License.
- **********************************************************************************************************************/
-
-package eu.stratosphere.nephele.io.channels.bytebuffered;
-
-import java.io.IOException;
-
-import eu.stratosphere.nephele.event.task.AbstractEvent;
-import eu.stratosphere.nephele.io.channels.Buffer;
-
-public interface ByteBufferedOutputChannelBroker {
-
- /**
- * Requests an empty write buffer from the broker. This method will block
- * until the requested write buffer is available.
- *
- * @return the byte buffers to write in
- * @throws InterruptedException
- * thrown if the connected task is interrupted while waiting for the buffer
- * @throws IOException
- * thrown if an error occurs while requesting the empty write buffer.
- */
- Buffer requestEmptyWriteBuffer() throws InterruptedException, IOException;
-
- /**
- * Returns a filled write buffer to the broker. The broker will take care
- * of the buffers and transfer the user data to the connected input channel on a best effort basis.
- *
- * @param buffer
- * the buffer to be returned to the broker
- * @throws InterruptedException
- * thrown if the thread is interrupted while waiting for the buffers to be released
- * @throws IOException
- * thrown if an I/O error occurs while releasing the buffers
- */
- void releaseWriteBuffer(Buffer buffer) throws IOException, InterruptedException;
-
- /**
- * Checks if there is still data created by this output channel that must be transfered to the corresponding input
- * channel.
- *
- * @return <code>true</code> if the channel has data left to transmit, <code>false</code> otherwise
- * @throws InterruptedException
- * thrown if the connected task is interrupted while waiting for the remaining data to be transmitted
- * @throws IOException
- * thrown if an error occurs while transmitting the remaining data
- */
- boolean hasDataLeftToTransmit() throws IOException, InterruptedException;
-
- /**
- * Forwards the given event to the connected network input channel on a best effort basis.
- *
- * @param event
- * the event to be transferred
- * @throws InterruptedException
- * thrown if the thread is interrupted while waiting for the event to be transfered
- * @throws IOException
- * thrown if an I/O error occurs while transfering the event
- */
- void transferEventToInputChannel(AbstractEvent event) throws IOException, InterruptedException;
-}
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/352c1b99/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/io/channels/bytebuffered/EndOfSuperstepEvent.java
----------------------------------------------------------------------
diff --git a/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/io/channels/bytebuffered/EndOfSuperstepEvent.java b/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/io/channels/bytebuffered/EndOfSuperstepEvent.java
deleted file mode 100644
index b0b20d4..0000000
--- a/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/io/channels/bytebuffered/EndOfSuperstepEvent.java
+++ /dev/null
@@ -1,34 +0,0 @@
-/***********************************************************************************************************************
- * Copyright (C) 2010-2013 by the Stratosphere project (http://stratosphere.eu)
- *
- * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
- * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations under the License.
- **********************************************************************************************************************/
-
-package eu.stratosphere.nephele.io.channels.bytebuffered;
-
-import java.io.DataInput;
-import java.io.DataOutput;
-import java.io.IOException;
-
-import eu.stratosphere.nephele.event.task.AbstractEvent;
-
-/**
- * Marks the end of a superstep of one particular iteration head
- */
-public class EndOfSuperstepEvent extends AbstractEvent {
-
- public static final EndOfSuperstepEvent INSTANCE = new EndOfSuperstepEvent();
-
- @Override
- public void write(DataOutput out) throws IOException {}
-
- @Override
- public void read(DataInput in) throws IOException {}
-}
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/352c1b99/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/io/channels/bytebuffered/InMemoryInputChannel.java
----------------------------------------------------------------------
diff --git a/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/io/channels/bytebuffered/InMemoryInputChannel.java b/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/io/channels/bytebuffered/InMemoryInputChannel.java
deleted file mode 100644
index 0609fcb..0000000
--- a/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/io/channels/bytebuffered/InMemoryInputChannel.java
+++ /dev/null
@@ -1,34 +0,0 @@
-/***********************************************************************************************************************
- * Copyright (C) 2010-2013 by the Stratosphere project (http://stratosphere.eu)
- *
- * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
- * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations under the License.
- **********************************************************************************************************************/
-
-package eu.stratosphere.nephele.io.channels.bytebuffered;
-
-import eu.stratosphere.core.io.IOReadableWritable;
-import eu.stratosphere.nephele.io.InputGate;
-import eu.stratosphere.nephele.io.RecordDeserializer;
-import eu.stratosphere.nephele.io.channels.ChannelID;
-import eu.stratosphere.nephele.io.channels.ChannelType;
-
-public final class InMemoryInputChannel<T extends IOReadableWritable> extends AbstractByteBufferedInputChannel<T> {
-
- public InMemoryInputChannel(InputGate<T> inputGate, int channelIndex, RecordDeserializer<T> deserializer,
- ChannelID channelID, ChannelID connectedChannelID) {
- super(inputGate, channelIndex, deserializer, channelID, connectedChannelID);
- }
-
- @Override
- public ChannelType getType() {
-
- return ChannelType.INMEMORY;
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/352c1b99/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/io/channels/bytebuffered/InMemoryOutputChannel.java
----------------------------------------------------------------------
diff --git a/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/io/channels/bytebuffered/InMemoryOutputChannel.java b/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/io/channels/bytebuffered/InMemoryOutputChannel.java
deleted file mode 100644
index 0113d6f..0000000
--- a/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/io/channels/bytebuffered/InMemoryOutputChannel.java
+++ /dev/null
@@ -1,34 +0,0 @@
-/***********************************************************************************************************************
- * Copyright (C) 2010-2013 by the Stratosphere project (http://stratosphere.eu)
- *
- * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
- * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations under the License.
- **********************************************************************************************************************/
-
-package eu.stratosphere.nephele.io.channels.bytebuffered;
-
-import eu.stratosphere.core.io.IOReadableWritable;
-import eu.stratosphere.nephele.io.OutputGate;
-import eu.stratosphere.nephele.io.channels.ChannelID;
-import eu.stratosphere.nephele.io.channels.ChannelType;
-
-public final class InMemoryOutputChannel<T extends IOReadableWritable> extends AbstractByteBufferedOutputChannel<T> {
-
- public InMemoryOutputChannel(OutputGate<T> outputGate, int channelIndex, ChannelID channelID,
- ChannelID connectedChannelID) {
- super(outputGate, channelIndex, channelID, connectedChannelID);
- }
-
- @Override
- public ChannelType getType() {
-
- return ChannelType.INMEMORY;
- }
-
-}
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/352c1b99/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/io/channels/bytebuffered/NetworkInputChannel.java
----------------------------------------------------------------------
diff --git a/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/io/channels/bytebuffered/NetworkInputChannel.java b/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/io/channels/bytebuffered/NetworkInputChannel.java
deleted file mode 100644
index ee11c0e..0000000
--- a/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/io/channels/bytebuffered/NetworkInputChannel.java
+++ /dev/null
@@ -1,34 +0,0 @@
-/***********************************************************************************************************************
- * Copyright (C) 2010-2013 by the Stratosphere project (http://stratosphere.eu)
- *
- * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
- * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations under the License.
- **********************************************************************************************************************/
-
-package eu.stratosphere.nephele.io.channels.bytebuffered;
-
-import eu.stratosphere.core.io.IOReadableWritable;
-import eu.stratosphere.nephele.io.InputGate;
-import eu.stratosphere.nephele.io.RecordDeserializer;
-import eu.stratosphere.nephele.io.channels.ChannelID;
-import eu.stratosphere.nephele.io.channels.ChannelType;
-
-public final class NetworkInputChannel<T extends IOReadableWritable> extends AbstractByteBufferedInputChannel<T> {
-
- public NetworkInputChannel(InputGate<T> inputGate, int channelIndex, RecordDeserializer<T> deserializer,
- ChannelID channelID, ChannelID connectedChannelID) {
- super(inputGate, channelIndex, deserializer, channelID, connectedChannelID);
- }
-
- @Override
- public ChannelType getType() {
-
- return ChannelType.NETWORK;
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/352c1b99/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/io/channels/bytebuffered/NetworkOutputChannel.java
----------------------------------------------------------------------
diff --git a/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/io/channels/bytebuffered/NetworkOutputChannel.java b/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/io/channels/bytebuffered/NetworkOutputChannel.java
deleted file mode 100644
index c98f41a..0000000
--- a/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/io/channels/bytebuffered/NetworkOutputChannel.java
+++ /dev/null
@@ -1,34 +0,0 @@
-/***********************************************************************************************************************
- * Copyright (C) 2010-2013 by the Stratosphere project (http://stratosphere.eu)
- *
- * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
- * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations under the License.
- **********************************************************************************************************************/
-
-package eu.stratosphere.nephele.io.channels.bytebuffered;
-
-import eu.stratosphere.core.io.IOReadableWritable;
-import eu.stratosphere.nephele.io.OutputGate;
-import eu.stratosphere.nephele.io.channels.ChannelID;
-import eu.stratosphere.nephele.io.channels.ChannelType;
-
-public final class NetworkOutputChannel<T extends IOReadableWritable> extends AbstractByteBufferedOutputChannel<T> {
-
- public NetworkOutputChannel(OutputGate<T> outputGate, int channelIndex, ChannelID channelID,
- ChannelID connectedChannelID) {
- super(outputGate, channelIndex, channelID, connectedChannelID);
- }
-
- @Override
- public ChannelType getType() {
-
- return ChannelType.NETWORK;
- }
-
-}
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/352c1b99/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/io/library/DirectoryReader.java
----------------------------------------------------------------------
diff --git a/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/io/library/DirectoryReader.java b/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/io/library/DirectoryReader.java
deleted file mode 100644
index acfddde..0000000
--- a/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/io/library/DirectoryReader.java
+++ /dev/null
@@ -1,103 +0,0 @@
-/***********************************************************************************************************************
- * Copyright (C) 2010-2013 by the Stratosphere project (http://stratosphere.eu)
- *
- * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
- * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations under the License.
- **********************************************************************************************************************/
-
-package eu.stratosphere.nephele.io.library;
-
-import java.util.Iterator;
-
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-
-import eu.stratosphere.core.fs.FSDataInputStream;
-import eu.stratosphere.core.fs.FileInputSplit;
-import eu.stratosphere.core.fs.FileSystem;
-import eu.stratosphere.nephele.io.RecordWriter;
-import eu.stratosphere.nephele.template.AbstractFileInputTask;
-import eu.stratosphere.nephele.types.FileRecord;
-
-public class DirectoryReader extends AbstractFileInputTask {
-
- /**
- * The record writer to write the output strings to.
- */
- private RecordWriter<FileRecord> output = null;
-
- // buffer
- private byte[] buffer;
-
- private static final Log LOG = LogFactory.getLog(DirectoryReader.class);
-
-
- @Override
- public void invoke() throws Exception {
-
- final Iterator<FileInputSplit> splitIterator = getFileInputSplits();
- FileRecord fr = null;
-
- while (splitIterator.hasNext()) {
-
- final FileInputSplit split = splitIterator.next();
-
- final long start = split.getStart();
- final long end = start + split.getLength();
-
- if (buffer == null || buffer.length < end - start) {
- buffer = new byte[(int) (end - start)];
- }
-
- if (fr == null || fr.getFileName().compareTo(split.getPath().getName()) != 0) {
- if (fr != null) {
- try {
- output.emit(fr);
- } catch (InterruptedException e) {
- // TODO: Respond to interruption properly
- LOG.error(e);
- }
- }
- fr = new FileRecord(split.getPath().getName());
- }
-
- final FileSystem fs = FileSystem.get(split.getPath().toUri());
-
- final FSDataInputStream fdis = fs.open(split.getPath());
- fdis.seek(split.getStart());
-
- int read = fdis.read(buffer, 0, buffer.length);
- if (read == -1) {
- continue;
- }
-
- fr.append(buffer, 0, read);
-
- if (read != end - start) {
- System.err.println("Unexpected number of bytes read! Expected: " + (end - start) + " Read: " + read);
- }
- }
-
- if (fr != null) {
- try {
- output.emit(fr);
- } catch (InterruptedException e) {
- // TODO: Respond to interruption properly
- LOG.error(e);
- }
- }
- }
-
-
- @Override
- public void registerInputOutput() {
- output = new RecordWriter<FileRecord>(this, FileRecord.class);
- }
-
-}
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/352c1b99/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/io/library/DirectoryWriter.java
----------------------------------------------------------------------
diff --git a/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/io/library/DirectoryWriter.java b/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/io/library/DirectoryWriter.java
deleted file mode 100644
index 915aaa7..0000000
--- a/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/io/library/DirectoryWriter.java
+++ /dev/null
@@ -1,67 +0,0 @@
-/***********************************************************************************************************************
- * Copyright (C) 2010-2013 by the Stratosphere project (http://stratosphere.eu)
- *
- * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
- * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations under the License.
- **********************************************************************************************************************/
-
-package eu.stratosphere.nephele.io.library;
-
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-
-import eu.stratosphere.core.fs.FSDataOutputStream;
-import eu.stratosphere.core.fs.FileSystem;
-import eu.stratosphere.core.fs.Path;
-import eu.stratosphere.nephele.io.RecordReader;
-import eu.stratosphere.nephele.template.AbstractFileOutputTask;
-import eu.stratosphere.nephele.types.FileRecord;
-
-public class DirectoryWriter extends AbstractFileOutputTask {
-
- /**
- * The record reader to read the incoming strings from.
- */
- private RecordReader<FileRecord> input = null;
-
- private static final Log LOG = LogFactory.getLog(DirectoryWriter.class);
-
- @Override
- public void invoke() throws Exception {
-
- final Path path = getFileOutputPath();
- final FileSystem fs = path.getFileSystem();
-
- try {
- while (input.hasNext()) {
-
- final FileRecord record = input.next();
- Path newPath = new Path(path + Path.SEPARATOR + record.getFileName());
- FSDataOutputStream outputStream = fs.create(newPath, true);
-
- outputStream.write(record.getDataBuffer(), 0, record.getDataBuffer().length);
- outputStream.close();
- // TODO: Implement me
- // System.out.println(input.next());
- // ODO Auto-generated catch block
- }
- } catch (InterruptedException e) {
- // TODO Auto-generated catch block
- LOG.error(e);
- }
-
- }
-
- @Override
- public void registerInputOutput() {
- input = new RecordReader<FileRecord>(this, FileRecord.class);
-
- }
-
-}
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/352c1b99/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/io/library/FileLineReader.java
----------------------------------------------------------------------
diff --git a/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/io/library/FileLineReader.java b/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/io/library/FileLineReader.java
deleted file mode 100644
index a26c56c..0000000
--- a/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/io/library/FileLineReader.java
+++ /dev/null
@@ -1,76 +0,0 @@
-/***********************************************************************************************************************
- * Copyright (C) 2010-2013 by the Stratosphere project (http://stratosphere.eu)
- *
- * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
- * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations under the License.
- **********************************************************************************************************************/
-
-package eu.stratosphere.nephele.io.library;
-
-import java.util.Iterator;
-
-import eu.stratosphere.core.fs.FSDataInputStream;
-import eu.stratosphere.core.fs.FileInputSplit;
-import eu.stratosphere.core.fs.FileSystem;
-import eu.stratosphere.core.io.StringRecord;
-import eu.stratosphere.nephele.io.RecordWriter;
-import eu.stratosphere.nephele.template.AbstractFileInputTask;
-import eu.stratosphere.runtime.fs.LineReader;
-
-/**
- * A file line reader reads the associated file input splits line by line and outputs the lines as string records.
- *
- */
-public class FileLineReader extends AbstractFileInputTask {
-
- private RecordWriter<StringRecord> output = null;
-
- @Override
- public void invoke() throws Exception {
-
- final Iterator<FileInputSplit> splitIterator = getFileInputSplits();
-
- while (splitIterator.hasNext()) {
-
- final FileInputSplit split = splitIterator.next();
-
- long start = split.getStart();
- long length = split.getLength();
-
- final FileSystem fs = FileSystem.get(split.getPath().toUri());
-
- final FSDataInputStream fdis = fs.open(split.getPath());
-
- final LineReader lineReader = new LineReader(fdis, start, length, (1024 * 1024));
-
- byte[] line = lineReader.readLine();
-
- while (line != null) {
-
- // Create a string object from the data read
- StringRecord str = new StringRecord();
- str.set(line);
-
- // Send out string
- output.emit(str);
-
- line = lineReader.readLine();
- }
-
- // Close the stream;
- lineReader.close();
- }
- }
-
- @Override
- public void registerInputOutput() {
- output = new RecordWriter<StringRecord>(this, StringRecord.class);
- }
-
-}
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/352c1b99/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/io/library/FileLineWriter.java
----------------------------------------------------------------------
diff --git a/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/io/library/FileLineWriter.java b/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/io/library/FileLineWriter.java
deleted file mode 100644
index 5be3e1b..0000000
--- a/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/io/library/FileLineWriter.java
+++ /dev/null
@@ -1,75 +0,0 @@
-/***********************************************************************************************************************
- * Copyright (C) 2010-2013 by the Stratosphere project (http://stratosphere.eu)
- *
- * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
- * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations under the License.
- **********************************************************************************************************************/
-
-package eu.stratosphere.nephele.io.library;
-
-import eu.stratosphere.core.fs.FSDataOutputStream;
-import eu.stratosphere.core.fs.FileStatus;
-import eu.stratosphere.core.fs.FileSystem;
-import eu.stratosphere.core.fs.Path;
-import eu.stratosphere.core.io.StringRecord;
-import eu.stratosphere.nephele.io.RecordReader;
-import eu.stratosphere.nephele.template.AbstractFileOutputTask;
-
-/**
- * A file line writer reads string records its input gate and writes them to the associated output file.
- *
- */
-public class FileLineWriter extends AbstractFileOutputTask {
-
- /**
- * The record reader through which incoming string records are received.
- */
- private RecordReader<StringRecord> input = null;
-
-
- @Override
- public void invoke() throws Exception {
-
- Path outputPath = getFileOutputPath();
-
- FileSystem fs = FileSystem.get(outputPath.toUri());
- if (fs.exists(outputPath)) {
- FileStatus status = fs.getFileStatus(outputPath);
-
- if (status.isDir()) {
- outputPath = new Path(outputPath.toUri().toString() + "/file_" + getIndexInSubtaskGroup() + ".txt");
- }
- }
-
- final FSDataOutputStream outputStream = fs.create(outputPath, true);
-
- while (this.input.hasNext()) {
-
- StringRecord record = this.input.next();
- byte[] recordByte = (record.toString() + "\r\n").getBytes();
- outputStream.write(recordByte, 0, recordByte.length);
- }
-
- outputStream.close();
-
- }
-
-
- @Override
- public void registerInputOutput() {
- this.input = new RecordReader<StringRecord>(this, StringRecord.class);
- }
-
-
- @Override
- public int getMaximumNumberOfSubtasks() {
- // The default implementation always returns -1
- return -1;
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/352c1b99/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/ipc/Client.java
----------------------------------------------------------------------
diff --git a/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/ipc/Client.java b/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/ipc/Client.java
index 0204ec6..ea7bd4a 100644
--- a/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/ipc/Client.java
+++ b/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/ipc/Client.java
@@ -19,6 +19,16 @@
package eu.stratosphere.nephele.ipc;
+import eu.stratosphere.core.io.IOReadableWritable;
+import eu.stratosphere.core.io.StringRecord;
+import eu.stratosphere.nephele.net.NetUtils;
+import eu.stratosphere.nephele.util.IOUtils;
+import eu.stratosphere.runtime.io.serialization.DataOutputSerializer;
+import eu.stratosphere.util.ClassUtils;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+
+import javax.net.SocketFactory;
import java.io.BufferedInputStream;
import java.io.BufferedOutputStream;
import java.io.DataInputStream;
@@ -31,24 +41,13 @@ import java.net.InetSocketAddress;
import java.net.Socket;
import java.net.SocketTimeoutException;
import java.net.UnknownHostException;
+import java.nio.ByteBuffer;
import java.util.Hashtable;
import java.util.Iterator;
import java.util.Map.Entry;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicLong;
-import javax.net.SocketFactory;
-
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-
-import eu.stratosphere.core.io.IOReadableWritable;
-import eu.stratosphere.core.io.StringRecord;
-import eu.stratosphere.nephele.io.DataOutputBuffer;
-import eu.stratosphere.nephele.net.NetUtils;
-import eu.stratosphere.nephele.util.IOUtils;
-import eu.stratosphere.util.ClassUtils;
-
/**
* A client for an IPC service. IPC calls take a single {@link Writable} as a
* parameter, and return a {@link Writable} as their value. A service runs on
@@ -379,13 +378,14 @@ public class Client {
out.write(Server.HEADER.array());
// Write out the ConnectionHeader
- DataOutputBuffer buf = new DataOutputBuffer();
+ DataOutputSerializer buf = new DataOutputSerializer(4 + header.getProtocol().getBytes().length + 1);
header.write(buf);
// Write out the payload length
- int bufLen = buf.getLength();
+ ByteBuffer wrapper = buf.wrapAsByteBuffer();
+ int bufLen = wrapper.limit();
out.writeInt(bufLen);
- out.write(buf.getData().array(), 0, bufLen);
+ out.write(wrapper.array(), 0, bufLen);
}
/*
@@ -456,30 +456,27 @@ public class Client {
return;
}
- DataOutputBuffer d = null;
+ DataOutputSerializer d = null;
try {
synchronized (this.out) {
// for serializing the
// data to be written
- d = new DataOutputBuffer();
+ d = new DataOutputSerializer(64);
// First, write call id to buffer d
d.writeInt(call.id);
// Then write RPC data (the actual call) to buffer d
call.param.write(d);
- byte[] data = d.getData().array();
- int dataLength = d.getLength();
+ ByteBuffer wrapper = d.wrapAsByteBuffer();
+ byte[] data = wrapper.array();
+ int dataLength = wrapper.limit();
out.writeInt(dataLength); // first put the data length
out.write(data, 0, dataLength);// write the data
out.flush();
}
} catch (IOException e) {
markClosed(e);
- } finally {
- // the buffer is just an in-memory buffer, but it is still polite to
- // close early
- IOUtils.closeStream(d);
}
}
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/352c1b99/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/jobgraph/AbstractJobVertex.java
----------------------------------------------------------------------
diff --git a/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/jobgraph/AbstractJobVertex.java b/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/jobgraph/AbstractJobVertex.java
index 6af9d74..b4c51f2 100644
--- a/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/jobgraph/AbstractJobVertex.java
+++ b/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/jobgraph/AbstractJobVertex.java
@@ -23,8 +23,7 @@ import eu.stratosphere.configuration.IllegalConfigurationException;
import eu.stratosphere.core.io.IOReadableWritable;
import eu.stratosphere.core.io.StringRecord;
import eu.stratosphere.nephele.execution.librarycache.LibraryCacheManager;
-import eu.stratosphere.nephele.io.DistributionPattern;
-import eu.stratosphere.nephele.io.channels.ChannelType;
+import eu.stratosphere.runtime.io.channels.ChannelType;
import eu.stratosphere.nephele.template.AbstractInvokable;
import eu.stratosphere.nephele.util.EnumUtils;
import eu.stratosphere.util.StringUtils;
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/352c1b99/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/jobgraph/DistributionPattern.java
----------------------------------------------------------------------
diff --git a/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/jobgraph/DistributionPattern.java b/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/jobgraph/DistributionPattern.java
new file mode 100644
index 0000000..3b8c9a0
--- /dev/null
+++ b/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/jobgraph/DistributionPattern.java
@@ -0,0 +1,33 @@
+/***********************************************************************************************************************
+ * Copyright (C) 2010-2013 by the Stratosphere project (http://stratosphere.eu)
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
+ * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations under the License.
+ **********************************************************************************************************************/
+
+package eu.stratosphere.nephele.jobgraph;
+
+/**
+ * A distribution pattern determines which subtasks of a producing Nephele task a wired to which
+ * subtasks of a consuming subtask.
+ *
+ */
+
+public enum DistributionPattern {
+
+ /**
+ * Each subtask of the producing Nephele task is wired to each subtask of the consuming Nephele task.
+ */
+ BIPARTITE,
+
+ /**
+ * The i-th subtask of the producing Nephele task is wired to the i-th subtask of the consuming Nephele task.
+ */
+ POINTWISE
+}
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/352c1b99/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/jobgraph/JobEdge.java
----------------------------------------------------------------------
diff --git a/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/jobgraph/JobEdge.java b/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/jobgraph/JobEdge.java
index 51396ff..45788dd 100644
--- a/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/jobgraph/JobEdge.java
+++ b/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/jobgraph/JobEdge.java
@@ -13,8 +13,7 @@
package eu.stratosphere.nephele.jobgraph;
-import eu.stratosphere.nephele.io.DistributionPattern;
-import eu.stratosphere.nephele.io.channels.ChannelType;
+import eu.stratosphere.runtime.io.channels.ChannelType;
/**
* Objects of this class represent edges in the user's job graph.
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/352c1b99/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/jobgraph/JobID.java
----------------------------------------------------------------------
diff --git a/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/jobgraph/JobID.java b/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/jobgraph/JobID.java
index e4467a0..2ec2ed6 100644
--- a/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/jobgraph/JobID.java
+++ b/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/jobgraph/JobID.java
@@ -1,5 +1,5 @@
/***********************************************************************************************************************
- * Copyright (C) 2010-2013 by the Stratosphere project (http://stratosphere.eu)
+ * Copyright (C) 2010-2014 by the Stratosphere project (http://stratosphere.eu)
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
@@ -13,81 +13,44 @@
package eu.stratosphere.nephele.jobgraph;
+import java.nio.ByteBuffer;
+
import javax.xml.bind.DatatypeConverter;
-import eu.stratosphere.nephele.io.AbstractID;
+import eu.stratosphere.nephele.AbstractID;
-/**
- * A class for statistically unique job IDs.
- * <p>
- * This class is thread-safe.
- *
- */
public final class JobID extends AbstractID {
- /**
- * Constructs a new random ID from a uniform distribution.
- */
public JobID() {
super();
}
- /**
- * Constructs a new job ID.
- *
- * @param lowerPart
- * the lower bytes of the ID
- * @param upperPart
- * the higher bytes of the ID
- */
- private JobID(final long lowerPart, final long upperPart) {
+ public JobID(long lowerPart, long upperPart) {
super(lowerPart, upperPart);
}
- /**
- * Constructs a new job ID from the given bytes.
- *
- * @param bytes
- * the bytes to initialize the job ID with
- */
- public JobID(final byte[] bytes) {
+ public JobID(byte[] bytes) {
super(bytes);
}
- /**
- * Generates a new statistically unique job ID.
- *
- * @return a new statistically unique job ID
- */
public static JobID generate() {
-
- final long lowerPart = AbstractID.generateRandomBytes();
- final long upperPart = AbstractID.generateRandomBytes();
+ long lowerPart = AbstractID.generateRandomLong();
+ long upperPart = AbstractID.generateRandomLong();
return new JobID(lowerPart, upperPart);
}
- /**
- * Constructs a new job ID and initializes it with the given bytes.
- *
- * @param bytes
- * the bytes to initialize the new job ID with
- * @return the new job ID
- */
- public static JobID fromByteArray(final byte[] bytes) {
-
+ public static JobID fromByteArray(byte[] bytes) {
return new JobID(bytes);
}
-
- /**
- * Constructs a new job ID and initializes it with the given bytes.
- *
- * @param bytes
- * the bytes to initialize the new job ID with
- * @return the new job ID
- */
- public static JobID fromHexString(final String hexString) {
+ public static JobID fromByteBuffer(ByteBuffer buf, int offset) {
+ long lower = buf.getLong(offset);
+ long upper = buf.getLong(offset + 8);
+ return new JobID(lower, upper);
+ }
+
+ public static JobID fromHexString(String hexString) {
return new JobID(DatatypeConverter.parseHexBinary(hexString));
}
}
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/352c1b99/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/jobgraph/JobVertexID.java
----------------------------------------------------------------------
diff --git a/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/jobgraph/JobVertexID.java b/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/jobgraph/JobVertexID.java
index c6f63ca..689cc02 100644
--- a/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/jobgraph/JobVertexID.java
+++ b/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/jobgraph/JobVertexID.java
@@ -13,7 +13,7 @@
package eu.stratosphere.nephele.jobgraph;
-import eu.stratosphere.nephele.io.AbstractID;
+import eu.stratosphere.nephele.AbstractID;
/**
* A class for statistically unique job vertex IDs.