You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by rm...@apache.org on 2014/06/10 21:35:20 UTC
[23/34] Offer buffer-oriented API for I/O (#25)
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/352c1b99/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/io/AbstractRecordWriter.java
----------------------------------------------------------------------
diff --git a/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/io/AbstractRecordWriter.java b/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/io/AbstractRecordWriter.java
deleted file mode 100644
index 9d50e9c..0000000
--- a/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/io/AbstractRecordWriter.java
+++ /dev/null
@@ -1,140 +0,0 @@
-/***********************************************************************************************************************
- * Copyright (C) 2010-2013 by the Stratosphere project (http://stratosphere.eu)
- *
- * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
- * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations under the License.
- **********************************************************************************************************************/
-
-package eu.stratosphere.nephele.io;
-
-import java.io.IOException;
-
-import eu.stratosphere.core.io.IOReadableWritable;
-import eu.stratosphere.nephele.event.task.AbstractTaskEvent;
-import eu.stratosphere.nephele.event.task.EventListener;
-import eu.stratosphere.nephele.execution.Environment;
-import eu.stratosphere.nephele.io.channels.bytebuffered.EndOfSuperstepEvent;
-import eu.stratosphere.nephele.template.AbstractInvokable;
-
-/**
- * Abstract base class for a regular record writer and broadcast record writer.
- *
- * @param <T> The type of the record that can be emitted with this record writer.
- */
-public abstract class AbstractRecordWriter<T extends IOReadableWritable> implements Writer<T> {
-
- /**
- * The output gate assigned to this record writer.
- */
- private OutputGate<T> outputGate;
-
- /**
- * The environment associated to this record writer.
- */
- private Environment environment;
-
- /**
- * Constructs a new record writer and registers a new output gate with the application's environment.
- *
- * @param invokable
- * the application that instantiated the record writer
- * @param outputClass
- * the class of records that can be emitted with this record writer
- * @param selector
- * the channel selector to be used to determine the output channel to be used for a record
- * @param isBroadcast
- * <code>true</code> if this record writer shall broadcast the records to all connected channels,
- * <code>false/<code> otherwise
- */
- public AbstractRecordWriter(AbstractInvokable invokable, Class<T> outputClass, ChannelSelector<T> selector, boolean isBroadcast) {
- this.environment = invokable.getEnvironment();
- connectOutputGate(outputClass, selector, isBroadcast);
- }
-
- /**
- * Connects a record writer to an output gate.
- *
- * @param outputClass
- * the class of the record that can be emitted with this record writer
- * @param selector
- * the channel selector to be used to determine the output channel to be used for a record
- * @param isBroadcast
- * <code>true</code> if this record writer shall broadcast the records to all connected channels,
- * <code>false/<code> otherwise
- */
- private void connectOutputGate(Class<T> outputClass, ChannelSelector<T> selector, boolean isBroadcast)
- {
- GateID gateID = this.environment.getNextUnboundOutputGateID();
- if (gateID == null) {
- gateID = new GateID();
- }
-
- this.outputGate = this.environment.createOutputGate(gateID, outputClass, selector, isBroadcast);
- this.environment.registerOutputGate(this.outputGate);
- }
-
- /**
- * This method emits a record to the corresponding output gate. The method may block
- * until the record was transfered via any of the connected channels.
- *
- * @param record
- * The record to be emitted.
- * @throws IOException
- * Thrown on an error that may happen during the transfer of the given record or a previous record.
- */
- public void emit(final T record) throws IOException, InterruptedException {
- this.outputGate.writeRecord(record);
- }
-
- /**
- * Subscribes the listener object to receive events of the given type.
- *
- * @param eventListener
- * the listener object to register
- * @param eventType
- * the type of event to register the listener for
- */
- public void subscribeToEvent(EventListener eventListener, Class<? extends AbstractTaskEvent> eventType) {
- this.outputGate.subscribeToEvent(eventListener, eventType);
- }
-
- /**
- * Removes the subscription for events of the given type for the listener object.
- *
- * @param eventListener
- * the listener object to cancel the subscription for
- * @param eventType
- * the type of the event to cancel the subscription for
- */
- public void unsubscribeFromEvent(EventListener eventListener, Class<? extends AbstractTaskEvent> eventType) {
- this.outputGate.unsubscribeFromEvent(eventListener, eventType);
- }
-
- /**
- * Publishes an event.
- *
- * @param event
- * the event to be published
- * @throws IOException
- * thrown if an error occurs while transmitting the event
- * @throws InterruptedException
- * thrown if the thread is interrupted while waiting for the event to be published
- */
- public void publishEvent(AbstractTaskEvent event) throws IOException, InterruptedException {
- this.outputGate.publishEvent(event);
- }
-
- public void flush() throws IOException, InterruptedException {
- this.outputGate.flush();
- }
-
- public void sendEndOfSuperstep() throws IOException, InterruptedException {
- this.outputGate.publishEvent(EndOfSuperstepEvent.INSTANCE);
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/352c1b99/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/io/AbstractSingleGateRecordReader.java
----------------------------------------------------------------------
diff --git a/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/io/AbstractSingleGateRecordReader.java b/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/io/AbstractSingleGateRecordReader.java
deleted file mode 100644
index 8def532..0000000
--- a/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/io/AbstractSingleGateRecordReader.java
+++ /dev/null
@@ -1,77 +0,0 @@
-/***********************************************************************************************************************
- * Copyright (C) 2010-2013 by the Stratosphere project (http://stratosphere.eu)
- *
- * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
- * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations under the License.
- **********************************************************************************************************************/
-
-package eu.stratosphere.nephele.io;
-
-import java.io.IOException;
-
-import eu.stratosphere.core.io.IOReadableWritable;
-import eu.stratosphere.nephele.event.task.AbstractTaskEvent;
-import eu.stratosphere.nephele.execution.Environment;
-import eu.stratosphere.nephele.template.AbstractInvokable;
-
-/**
- * This is an abstract base class for a record reader, either dealing with mutable or immutable records.
- *
- * @param <T> The type of the record that can be read from this record reader.
- */
-public abstract class AbstractSingleGateRecordReader<T extends IOReadableWritable> extends AbstractRecordReader {
-
- /**
- * The input gate associated with the record reader.
- */
- protected final InputGate<T> inputGate;
-
- // --------------------------------------------------------------------------------------------
-
- protected AbstractSingleGateRecordReader(AbstractInvokable invokable, RecordDeserializerFactory<T> deserializerFactory, int inputGateID) {
- Environment environment = invokable.getEnvironment();
- GateID gateID = environment.getNextUnboundInputGateID();
- if (gateID == null) {
- gateID = new GateID();
- }
-
- this.inputGate = environment.createInputGate(gateID, deserializerFactory);
- environment.registerInputGate(this.inputGate);
- }
-
- /**
- * Returns the number of input channels wired to this reader's input gate.
- *
- * @return the number of input channels wired to this reader's input gate
- */
- public int getNumberOfInputChannels() {
- return this.inputGate.getNumberOfInputChannels();
- }
-
- /**
- * Publishes an event.
- *
- * @param event
- * the event to be published
- * @throws IOException
- * thrown if an error occurs while transmitting the event
- * @throws InterruptedException
- * thrown if the thread is interrupted while waiting for the event to be published
- */
- @Override
- public void publishEvent(AbstractTaskEvent event) throws IOException, InterruptedException {
- // Delegate call to input gate to send events
- this.inputGate.publishEvent(event);
- }
-
-
- InputGate<T> getInputGate() {
- return this.inputGate;
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/352c1b99/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/io/AbstractUnionRecordReader.java
----------------------------------------------------------------------
diff --git a/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/io/AbstractUnionRecordReader.java b/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/io/AbstractUnionRecordReader.java
deleted file mode 100644
index 5de3f67..0000000
--- a/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/io/AbstractUnionRecordReader.java
+++ /dev/null
@@ -1,152 +0,0 @@
-/***********************************************************************************************************************
- * Copyright (C) 2010-2013 by the Stratosphere project (http://stratosphere.eu)
- *
- * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
- * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations under the License.
- **********************************************************************************************************************/
-
-package eu.stratosphere.nephele.io;
-
-import java.io.IOException;
-import java.util.ArrayDeque;
-import java.util.HashSet;
-import java.util.Set;
-
-import eu.stratosphere.core.io.IOReadableWritable;
-import eu.stratosphere.nephele.event.task.AbstractTaskEvent;
-
-public abstract class AbstractUnionRecordReader<T extends IOReadableWritable> extends AbstractRecordReader implements RecordAvailabilityListener<T> {
-
- /**
- * The set of all input gates.
- */
- private final InputGate<T>[] allInputGates;
-
- /**
- * The set of unclosed input gates.
- */
- private final Set<InputGate<T>> remainingInputGates;
-
- /**
- * Queue with indices of channels that store at least one available record.
- */
- private final ArrayDeque<InputGate<T>> availableInputGates = new ArrayDeque<InputGate<T>>();
-
- /**
- * The next input gate to read a record from.
- */
- private InputGate<T> nextInputGateToReadFrom;
-
-
- @Override
- public boolean isInputClosed() {
- return this.remainingInputGates.isEmpty();
- }
-
- /**
- * Constructs a new mutable union record reader.
- *
- * @param recordReaders
- * the individual mutable record readers whose input is used to construct the union
- */
- @SuppressWarnings("unchecked")
- protected AbstractUnionRecordReader(MutableRecordReader<T>[] recordReaders) {
-
- if (recordReaders == null) {
- throw new IllegalArgumentException("Provided argument recordReaders is null");
- }
-
- if (recordReaders.length < 2) {
- throw new IllegalArgumentException(
- "The mutable union record reader must at least be initialized with two individual mutable record readers");
- }
-
- this.allInputGates = new InputGate[recordReaders.length];
- this.remainingInputGates = new HashSet<InputGate<T>>((int) (recordReaders.length * 1.6f));
-
- for (int i = 0; i < recordReaders.length; i++) {
- InputGate<T> inputGate = recordReaders[i].getInputGate();
- inputGate.registerRecordAvailabilityListener(this);
- this.allInputGates[i] = inputGate;
- this.remainingInputGates.add(inputGate);
- }
- }
-
-
- @Override
- public void publishEvent(AbstractTaskEvent event) throws IOException, InterruptedException {
- for (InputGate<T> gate : this.allInputGates) {
- gate.publishEvent(event);
- }
- }
-
- @Override
- public void reportRecordAvailability(InputGate<T> inputGate) {
- synchronized (this.availableInputGates) {
- this.availableInputGates.add(inputGate);
- this.availableInputGates.notifyAll();
- }
- }
-
- protected boolean getNextRecord(T target) throws IOException, InterruptedException {
-
- while (true) {
- // has the current input gate more data?
- if (this.nextInputGateToReadFrom == null) {
- if (this.remainingInputGates.isEmpty()) {
- return false;
- }
-
- this.nextInputGateToReadFrom = getNextAvailableInputGate();
- }
-
- InputChannelResult result = this.nextInputGateToReadFrom.readRecord(target);
- switch (result) {
- case INTERMEDIATE_RECORD_FROM_BUFFER: // record is available and we can stay on the same channel
- return true;
-
- case LAST_RECORD_FROM_BUFFER: // record is available, but we need to re-check the channels
- this.nextInputGateToReadFrom = null;
- return true;
-
- case END_OF_SUPERSTEP:
- this.nextInputGateToReadFrom = null;
- if (incrementEndOfSuperstepEventAndCheck()) {
- return false; // end of the superstep
- }
- else {
- break; // fall through and wait for next record/event
- }
-
- case TASK_EVENT: // event for the subscribers is available
- handleEvent(this.nextInputGateToReadFrom.getCurrentEvent());
- this.nextInputGateToReadFrom = null;
- break;
-
- case END_OF_STREAM: // one gate is empty
- this.remainingInputGates.remove(this.nextInputGateToReadFrom);
- this.nextInputGateToReadFrom = null;
- break;
-
- case NONE: // gate processed an internal event and could not return a record on this call
- this.nextInputGateToReadFrom = null;
- break;
- }
- }
- }
-
- private InputGate<T> getNextAvailableInputGate() throws InterruptedException {
- synchronized (this.availableInputGates) {
- while (this.availableInputGates.isEmpty()) {
- this.availableInputGates.wait();
- }
- return this.availableInputGates.pop();
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/352c1b99/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/io/BroadcastRecordWriter.java
----------------------------------------------------------------------
diff --git a/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/io/BroadcastRecordWriter.java b/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/io/BroadcastRecordWriter.java
deleted file mode 100644
index f2b1141..0000000
--- a/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/io/BroadcastRecordWriter.java
+++ /dev/null
@@ -1,53 +0,0 @@
-/***********************************************************************************************************************
- * Copyright (C) 2010-2013 by the Stratosphere project (http://stratosphere.eu)
- *
- * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
- * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations under the License.
- **********************************************************************************************************************/
-
-package eu.stratosphere.nephele.io;
-
-import eu.stratosphere.core.io.IOReadableWritable;
-import eu.stratosphere.nephele.template.AbstractInputTask;
-import eu.stratosphere.nephele.template.AbstractTask;
-
-/**
- * A record writer connects the application to an output gate. It allows the application
- * of emit (send out) to the output gate. The broadcast record writer will make sure that each emitted record will be
- * transfered via all connected output channels.
- *
- * @param <T>
- * the type of the record that can be emitted with this record writer
- */
-public class BroadcastRecordWriter<T extends IOReadableWritable> extends AbstractRecordWriter<T> {
-
- /**
- * Constructs a new broadcast record writer and registers a new output gate with the application's environment.
- *
- * @param taskBase
- * the application that instantiated the record writer
- * @param outputClass
- * the class of records that can be emitted with this record writer
- */
- public BroadcastRecordWriter(AbstractTask taskBase, Class<T> outputClass) {
- super(taskBase, outputClass, null, true);
- }
-
- /**
- * Constructs a new broadcast record writer and registers a new output gate with the application's environment.
- *
- * @param inputBase
- * the application that instantiated the record writer
- * @param outputClass
- * the class of records that can be emitted with this record writer
- */
- public BroadcastRecordWriter(AbstractInputTask<?> inputBase, Class<T> outputClass) {
- super(inputBase, outputClass, null, true);
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/352c1b99/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/io/ChannelSelector.java
----------------------------------------------------------------------
diff --git a/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/io/ChannelSelector.java b/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/io/ChannelSelector.java
deleted file mode 100644
index f7a7e9b..0000000
--- a/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/io/ChannelSelector.java
+++ /dev/null
@@ -1,39 +0,0 @@
-/***********************************************************************************************************************
- * Copyright (C) 2010-2013 by the Stratosphere project (http://stratosphere.eu)
- *
- * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
- * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations under the License.
- **********************************************************************************************************************/
-
-package eu.stratosphere.nephele.io;
-
-import eu.stratosphere.core.io.IOReadableWritable;
-
-/**
- * Objects implementing this interface are passed to an {@link OutputGate}. When a record is sent through the output
- * gate, the channel selector object is called to determine to which {@link AbstractOutputChannel} objects the record
- * shall be passed on.
- *
- * @param <T>
- * the type of record which is sent through the attached output gate
- */
-public interface ChannelSelector<T extends IOReadableWritable> {
-
- /**
- * Called to determine to which attached {@link AbstractOutputChannel} objects the given record shall be forwarded.
- *
- * @param record
- * the record to the determine the output channels for
- * @param numberOfOutputChannels
- * the total number of output channels which are attached to respective output gate
- * @return a (possibly empty) array of integer numbers which indicate the indices of the output channels through
- * which the record shall be forwarded
- */
- int[] selectChannels(T record, int numberOfOutputChannels);
-}
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/352c1b99/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/io/DataOutputBuffer.java
----------------------------------------------------------------------
diff --git a/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/io/DataOutputBuffer.java b/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/io/DataOutputBuffer.java
deleted file mode 100644
index 171e985..0000000
--- a/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/io/DataOutputBuffer.java
+++ /dev/null
@@ -1,165 +0,0 @@
-/***********************************************************************************************************************
- * Copyright (C) 2010-2013 by the Stratosphere project (http://stratosphere.eu)
- *
- * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
- * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations under the License.
- **********************************************************************************************************************/
-
-/**
- * This file is based on source code from the Hadoop Project (http://hadoop.apache.org/), licensed by the Apache
- * Software Foundation (ASF) under the Apache License, Version 2.0. See the NOTICE file distributed with this work for
- * additional information regarding copyright ownership.
- */
-
-package eu.stratosphere.nephele.io;
-
-import java.io.DataInput;
-import java.io.DataOutputStream;
-import java.io.IOException;
-import java.io.OutputStream;
-import java.nio.ByteBuffer;
-
-/**
- * A reusable {@link DataOutput} implementation that writes to an in-memory
- * buffer.
- * <p>
- * This saves memory over creating a new DataOutputStream and ByteArrayOutputStream each time data is written.
- * <p>
- * Typical usage is something like the following:
- *
- * <pre>
- *
- * DataOutputBuffer buffer = new DataOutputBuffer();
- * while (... loop condition ...) {
- * buffer.reset();
- * ... write buffer using DataOutput methods ...
- * byte[] data = buffer.getData();
- * int dataLength = buffer.getLength();
- * ... write data to its ultimate destination ...
- * }
- * </pre>
- */
-public class DataOutputBuffer extends DataOutputStream {
-
- private static class ByteBufferedOutputStream extends OutputStream {
-
- private ByteBuffer buf;
-
- public ByteBuffer getData() {
- return this.buf;
- }
-
- public int getLength() {
- return this.buf.limit();
- }
-
- public ByteBufferedOutputStream() {
- this(1024);
- }
-
- public ByteBufferedOutputStream(int size) {
- this.buf = ByteBuffer.allocate(size);
- this.buf.position(0);
- this.buf.limit(0);
- }
-
- public void reset() {
- this.buf.position(0);
- this.buf.limit(0);
- }
-
- public void write(DataInput in, int len) throws IOException {
-
- final int newcount = this.buf.limit() + len;
- if (newcount > this.buf.capacity()) {
- final ByteBuffer newBuf = ByteBuffer.allocate(Math.max(this.buf.capacity() << 1, newcount));
- newBuf.position(0);
- System.arraycopy(this.buf.array(), 0, newBuf.array(), 0, this.buf.limit());
- newBuf.limit(this.buf.limit());
- this.buf = newBuf;
- }
-
- in.readFully(this.buf.array(), this.buf.limit(), len);
- this.buf.limit(newcount);
- }
-
- @Override
- public void write(byte[] b, int off, int len) throws IOException {
-
- final int newcount = this.buf.limit() + len;
- if (newcount > this.buf.capacity()) {
- increaseInternalBuffer(newcount);
- }
-
- System.arraycopy(b, off, this.buf.array(), this.buf.limit(), len);
- this.buf.limit(newcount);
- }
-
- @Override
- public void write(byte[] b) throws IOException {
- write(b, 0, b.length);
- }
-
- @Override
- public void write(int arg0) throws IOException {
-
- final int oldLimit = this.buf.limit();
- final int newLimit = oldLimit + 1;
-
- if (newLimit > this.buf.capacity()) {
- increaseInternalBuffer(newLimit);
- }
-
- this.buf.limit(newLimit);
- this.buf.put(oldLimit, (byte) arg0);
- }
-
- private void increaseInternalBuffer(int minimumRequiredSize) {
- final ByteBuffer newBuf = ByteBuffer.allocate(Math.max(this.buf.capacity() << 1, minimumRequiredSize));
- newBuf.position(0);
- System.arraycopy(this.buf.array(), 0, newBuf.array(), 0, this.buf.limit());
- newBuf.limit(this.buf.limit());
- this.buf = newBuf;
- }
- }
-
- private final ByteBufferedOutputStream byteBufferedOutputStream;
-
- /** Constructs a new empty buffer. */
- public DataOutputBuffer() {
- this(new ByteBufferedOutputStream());
- }
-
- public DataOutputBuffer(int size) {
- this(new ByteBufferedOutputStream(size));
- }
-
- private DataOutputBuffer(ByteBufferedOutputStream byteBufferedOutputStream) {
- super(byteBufferedOutputStream);
- this.byteBufferedOutputStream = byteBufferedOutputStream;
- }
-
- public ByteBuffer getData() {
- return this.byteBufferedOutputStream.getData();
- }
-
- public int getLength() {
- return this.byteBufferedOutputStream.getLength();
- }
-
- /** Resets the buffer to empty. */
- public DataOutputBuffer reset() {
- this.byteBufferedOutputStream.reset();
- return this;
- }
-
- public void write(DataInput in, int length) throws IOException {
- this.byteBufferedOutputStream.write(in, length);
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/352c1b99/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/io/DefaultChannelSelector.java
----------------------------------------------------------------------
diff --git a/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/io/DefaultChannelSelector.java b/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/io/DefaultChannelSelector.java
deleted file mode 100644
index cabc208..0000000
--- a/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/io/DefaultChannelSelector.java
+++ /dev/null
@@ -1,47 +0,0 @@
-/***********************************************************************************************************************
- * Copyright (C) 2010-2013 by the Stratosphere project (http://stratosphere.eu)
- *
- * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
- * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations under the License.
- **********************************************************************************************************************/
-
-package eu.stratosphere.nephele.io;
-
-import eu.stratosphere.core.io.IOReadableWritable;
-
-/**
- * This is the default implementation of the {@link ChannelSelector} interface. It represents a simple round-robin
- * strategy, i.e. regardless of the record every attached exactly one output channel is selected at a time.
-
- * @param <T>
- * the type of record which is sent through the attached output gate
- */
-public class DefaultChannelSelector<T extends IOReadableWritable> implements ChannelSelector<T> {
-
- /**
- * Stores the index of the channel to send the next record to.
- */
- private final int[] nextChannelToSendTo = new int[1];
-
- /**
- * Constructs a new default channel selector.
- */
- public DefaultChannelSelector() {
- this.nextChannelToSendTo[0] = 0;
- }
-
-
- @Override
- public int[] selectChannels(final T record, final int numberOfOutpuChannels) {
-
- this.nextChannelToSendTo[0] = (this.nextChannelToSendTo[0] + 1) % numberOfOutpuChannels;
-
- return this.nextChannelToSendTo;
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/352c1b99/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/io/DistributionPattern.java
----------------------------------------------------------------------
diff --git a/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/io/DistributionPattern.java b/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/io/DistributionPattern.java
deleted file mode 100644
index 634fbcc..0000000
--- a/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/io/DistributionPattern.java
+++ /dev/null
@@ -1,33 +0,0 @@
-/***********************************************************************************************************************
- * Copyright (C) 2010-2013 by the Stratosphere project (http://stratosphere.eu)
- *
- * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
- * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations under the License.
- **********************************************************************************************************************/
-
-package eu.stratosphere.nephele.io;
-
-/**
- * A distribution pattern determines which subtasks of a producing Nephele task a wired to which
- * subtasks of a consuming subtask.
- *
- */
-
-public enum DistributionPattern {
-
- /**
- * Each subtask of the producing Nephele task is wired to each subtask of the consuming Nephele task.
- */
- BIPARTITE,
-
- /**
- * The i-th subtask of the producing Nephele task is wired to the i-th subtask of the consuming Nephele task.
- */
- POINTWISE
-}
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/352c1b99/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/io/Gate.java
----------------------------------------------------------------------
diff --git a/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/io/Gate.java b/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/io/Gate.java
deleted file mode 100644
index 26773ca..0000000
--- a/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/io/Gate.java
+++ /dev/null
@@ -1,126 +0,0 @@
-/***********************************************************************************************************************
- * Copyright (C) 2010-2013 by the Stratosphere project (http://stratosphere.eu)
- *
- * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
- * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations under the License.
- **********************************************************************************************************************/
-
-package eu.stratosphere.nephele.io;
-
-import java.io.IOException;
-
-import eu.stratosphere.core.io.IOReadableWritable;
-import eu.stratosphere.nephele.event.task.AbstractEvent;
-import eu.stratosphere.nephele.event.task.AbstractTaskEvent;
-import eu.stratosphere.nephele.event.task.EventListener;
-import eu.stratosphere.nephele.io.channels.ChannelType;
-import eu.stratosphere.nephele.jobgraph.JobID;
-
-public interface Gate<T extends IOReadableWritable> {
-
- /**
- * Returns the index that has been assigned to the gate upon initialization.
- *
- * @return the index that has been assigned to the gate upon initialization.
- */
- int getIndex();
-
- /**
- * Subscribes the listener object to receive events of the given type.
- *
- * @param eventListener
- * the listener object to register
- * @param eventType
- * the type of event to register the listener for
- */
- void subscribeToEvent(EventListener eventListener, Class<? extends AbstractTaskEvent> eventType);
-
- /**
- * Removes the subscription for events of the given type for the listener object.
- *
- * @param eventListener
- * the listener object to cancel the subscription for
- * @param eventType
- * the type of the event to cancel the subscription for
- */
- void unsubscribeFromEvent(EventListener eventListener, Class<? extends AbstractTaskEvent> eventType);
-
- /**
- * Publishes an event.
- *
- * @param event
- * the event to be published
- * @throws IOException
- * thrown if an error occurs while transmitting the event
- * @throws InterruptedException
- * thrown if the thread is interrupted while waiting for the event to be published
- */
- void publishEvent(AbstractEvent event) throws IOException, InterruptedException;
-
- /**
- * Passes a received event on to the event notification manager so it cam ne dispatched.
- *
- * @param event
- * the event to pass on to the notification manager
- */
- void deliverEvent(AbstractTaskEvent event);
-
- /**
- * Returns the ID of the job this gate belongs to.
- *
- * @return the ID of the job this gate belongs to
- */
- JobID getJobID();
-
- /**
- * Returns the type of the input/output channels which are connected to this gate.
- *
- * @return the type of input/output channels which are connected to this gate
- */
- ChannelType getChannelType();
-
- /**
- * Returns the ID of the gate.
- *
- * @return the ID of the gate
- */
- GateID getGateID();
-
- /**
- * Releases the allocated resources (particularly buffer) of all channels attached to this gate. This method
- * should only be called after the respected task has stopped running.
- */
- void releaseAllChannelResources();
-
- /**
- * Checks if the gate is closed. The gate is closed if all this associated channels are closed.
- *
- * @return <code>true</code> if the gate is closed, <code>false</code> otherwise
- * @throws IOException
- * thrown if any error occurred while closing the gate
- * @throws InterruptedException
- * thrown if the gate is interrupted while waiting for this operation to complete
- */
- boolean isClosed() throws IOException, InterruptedException;
-
- /**
- * Checks if the considered gate is an input gate.
- *
- * @return <code>true</code> if the considered gate is an input gate, <code>false</code> if it is an output gate
- */
- boolean isInputGate();
-
- /**
- * Sets the type of the input/output channels which are connected to this gate.
- *
- * @param channelType
- * the type of input/output channels which are connected to this gate
- */
- void setChannelType(ChannelType channelType);
-}
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/352c1b99/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/io/GateID.java
----------------------------------------------------------------------
diff --git a/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/io/GateID.java b/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/io/GateID.java
deleted file mode 100644
index 9998916..0000000
--- a/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/io/GateID.java
+++ /dev/null
@@ -1,22 +0,0 @@
-/***********************************************************************************************************************
- * Copyright (C) 2010-2013 by the Stratosphere project (http://stratosphere.eu)
- *
- * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
- * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations under the License.
- **********************************************************************************************************************/
-
-package eu.stratosphere.nephele.io;
-
-/**
- * A class for statistically unique gate IDs.
- *
- */
-public final class GateID extends AbstractID {
-
-}
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/352c1b99/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/io/ImmutableRecordDeserializerFactory.java
----------------------------------------------------------------------
diff --git a/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/io/ImmutableRecordDeserializerFactory.java b/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/io/ImmutableRecordDeserializerFactory.java
deleted file mode 100644
index be0a60f..0000000
--- a/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/io/ImmutableRecordDeserializerFactory.java
+++ /dev/null
@@ -1,41 +0,0 @@
-/***********************************************************************************************************************
- * Copyright (C) 2010-2013 by the Stratosphere project (http://stratosphere.eu)
- *
- * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
- * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations under the License.
- **********************************************************************************************************************/
-
-package eu.stratosphere.nephele.io;
-
-import eu.stratosphere.core.io.IOReadableWritable;
-import eu.stratosphere.nephele.io.channels.DefaultDeserializer;
-
-/**
- * As simple factory implementation that instantiates deserializers for immutable records. For
- * each deserialization, a new record is instantiated from the given class.
- */
-public class ImmutableRecordDeserializerFactory<T extends IOReadableWritable> implements RecordDeserializerFactory<T> {
-
- private final Class<? extends T> recordType; // the type of the record to be deserialized
-
-
- /**
- * Creates a new factory that instantiates deserializers for immutable records.
- *
- * @param recordType The type of the record to be deserialized.
- */
- public ImmutableRecordDeserializerFactory(final Class<? extends T> recordType) {
- this.recordType = recordType;
- }
-
- @Override
- public RecordDeserializer<T> createDeserializer() {
- return new DefaultDeserializer<T>(this.recordType);
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/352c1b99/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/io/InputChannelResult.java
----------------------------------------------------------------------
diff --git a/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/io/InputChannelResult.java b/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/io/InputChannelResult.java
deleted file mode 100644
index f154a3f..0000000
--- a/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/io/InputChannelResult.java
+++ /dev/null
@@ -1,23 +0,0 @@
-/***********************************************************************************************************************
- * Copyright (C) 2010-2013 by the Stratosphere project (http://stratosphere.eu)
- *
- * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
- * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations under the License.
- **********************************************************************************************************************/
-package eu.stratosphere.nephele.io;
-
-public enum InputChannelResult {
-
- NONE,
- INTERMEDIATE_RECORD_FROM_BUFFER,
- LAST_RECORD_FROM_BUFFER,
- END_OF_SUPERSTEP,
- TASK_EVENT,
- END_OF_STREAM;
-}
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/352c1b99/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/io/InputGate.java
----------------------------------------------------------------------
diff --git a/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/io/InputGate.java b/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/io/InputGate.java
deleted file mode 100644
index 6a57756..0000000
--- a/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/io/InputGate.java
+++ /dev/null
@@ -1,136 +0,0 @@
-/***********************************************************************************************************************
- * Copyright (C) 2010-2013 by the Stratosphere project (http://stratosphere.eu)
- *
- * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
- * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations under the License.
- **********************************************************************************************************************/
-
-package eu.stratosphere.nephele.io;
-
-import java.io.IOException;
-
-import eu.stratosphere.core.io.IOReadableWritable;
-import eu.stratosphere.nephele.event.task.AbstractTaskEvent;
-import eu.stratosphere.nephele.io.channels.AbstractInputChannel;
-import eu.stratosphere.nephele.io.channels.ChannelID;
-import eu.stratosphere.nephele.io.channels.bytebuffered.InMemoryInputChannel;
-import eu.stratosphere.nephele.io.channels.bytebuffered.NetworkInputChannel;
-
-/**
- * @param <T> The type of record that can be transported through this gate.
- */
-public interface InputGate<T extends IOReadableWritable> extends Gate<T> {
-
- /**
- * Reads a record from one of the associated input channels. Channels are read such that one buffer from a channel is
- * consecutively consumed. The buffers in turn are consumed in the order in which they arrive.
- * Note that this method is not guaranteed to return a record, because the currently available channel data may not always
- * constitute an entire record, when events or partial records are part of the data.
- *
- * When called even though no data is available, this call will block until data is available, so this method should be called
- * when waiting is desired (such as when synchronously consuming a single gate) or only when it is known that data is available
- * (such as when reading a union of multiple input gates).
- *
- * @param target The record object into which to construct the complete record.
- * @return The result indicating whether a complete record is available, a event is available, only incomplete data
- * is available (NONE), or the gate is exhausted.
- * @throws IOException Thrown when an error occurred in the network stack relating to this channel.
- * @throws InterruptedException Thrown, when the thread working on this channel is interrupted.
- */
- InputChannelResult readRecord(T target) throws IOException, InterruptedException;
-
- /**
- * Returns the number of input channels associated with this input gate.
- *
- * @return the number of input channels associated with this input gate
- */
- int getNumberOfInputChannels();
-
- /**
- * Returns the input channel from position <code>pos</code> of the gate's internal channel list.
- *
- * @param pos
- * the position to retrieve the channel from
- * @return the channel from the given position or <code>null</code> if such position does not exist.
- */
- AbstractInputChannel<T> getInputChannel(int pos);
-
- /**
- * Notify the gate that the channel with the given index has
- * at least one record available.
- *
- * @param channelIndex
- * the index of the channel which has at least one record available
- */
- void notifyRecordIsAvailable(int channelIndex);
-
- /**
- * Notify the gate that is has consumed a data unit from the channel with the given index
- *
- * @param channelIndex
- * the index of the channel from which a data unit has been consumed
- */
- void notifyDataUnitConsumed(int channelIndex);
-
- /**
- * Immediately closes the input gate and all its input channels. The corresponding
- * output channels are notified. Any remaining records in any buffers or queue is considered
- * irrelevant and is discarded.
- *
- * @throws IOException
- * thrown if an I/O error occurs while closing the gate
- * @throws InterruptedException
- * thrown if the thread is interrupted while waiting for the gate to be closed
- */
- void close() throws IOException, InterruptedException;
-
- /**
- * Creates a new network input channel and assigns it to the given input gate.
- *
- * @param inputGate
- * the input gate the channel shall be assigned to
- * @param channelID
- * the ID of the channel
- * @param connectedChannelID
- * the ID of the channel this channel is connected to
- * @param compressionLevel
- * the level of compression to be used for this channel
- * @return the new network input channel
- */
- NetworkInputChannel<T> createNetworkInputChannel(InputGate<T> inputGate, ChannelID channelID,
- ChannelID connectedChannelID);
-
-
- /**
- * Creates a new in-memory input channel and assigns it to the given input gate.
- *
- * @param inputGate
- * the input gate the channel shall be assigned to
- * @param channelID
- * the ID of the channel
- * @param connectedChannelID
- * the ID of the channel this channel is connected to
- * @param compressionLevel
- * the level of compression to be used for this channel
- * @return the new in-memory input channel
- */
- InMemoryInputChannel<T> createInMemoryInputChannel(InputGate<T> inputGate, ChannelID channelID,
- ChannelID connectedChannelID);
-
- /**
- * Registers a {@link RecordAvailabilityListener} with this input gate.
- *
- * @param listener
- * the listener object to be registered
- */
- void registerRecordAvailabilityListener(RecordAvailabilityListener<T> listener);
-
-
- AbstractTaskEvent getCurrentEvent();
-}
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/352c1b99/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/io/MutableReader.java
----------------------------------------------------------------------
diff --git a/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/io/MutableReader.java b/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/io/MutableReader.java
deleted file mode 100644
index ef1080c..0000000
--- a/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/io/MutableReader.java
+++ /dev/null
@@ -1,32 +0,0 @@
-/***********************************************************************************************************************
- * Copyright (C) 2010-2013 by the Stratosphere project (http://stratosphere.eu)
- *
- * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
- * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations under the License.
- **********************************************************************************************************************/
-
-package eu.stratosphere.nephele.io;
-
-import java.io.IOException;
-
-import eu.stratosphere.core.io.IOReadableWritable;
-
-/**
- *
- */
-public interface MutableReader<T extends IOReadableWritable> extends ReaderBase {
-
- /**
- * @param target
- * @return
- * @throws IOException
- * @throws InterruptedException
- */
- boolean next(T target) throws IOException, InterruptedException;
-}
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/352c1b99/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/io/MutableRecordDeserializerFactory.java
----------------------------------------------------------------------
diff --git a/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/io/MutableRecordDeserializerFactory.java b/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/io/MutableRecordDeserializerFactory.java
deleted file mode 100644
index 5890562..0000000
--- a/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/io/MutableRecordDeserializerFactory.java
+++ /dev/null
@@ -1,52 +0,0 @@
-/***********************************************************************************************************************
- * Copyright (C) 2010-2013 by the Stratosphere project (http://stratosphere.eu)
- *
- * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
- * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations under the License.
- **********************************************************************************************************************/
-
-package eu.stratosphere.nephele.io;
-
-import eu.stratosphere.core.io.IOReadableWritable;
-import eu.stratosphere.nephele.io.channels.DefaultDeserializer;
-
-/**
- * As simple factory implementation that instantiates deserializers for mutable records.
- */
-public class MutableRecordDeserializerFactory<T extends IOReadableWritable> implements RecordDeserializerFactory<T> {
-
- /**
- * Creates a new factory that instantiates deserializers for immutable records.
- *
- * @param recordType The type of the record to be deserialized.
- */
- public MutableRecordDeserializerFactory() {}
-
- @Override
- public RecordDeserializer<T> createDeserializer() {
- return new DefaultDeserializer<T>(null);
- }
-
- // --------------------------------------------------------------------------------------------
-
- private static final RecordDeserializerFactory<IOReadableWritable> INSTANCE =
- new MutableRecordDeserializerFactory<IOReadableWritable>();
-
- /**
- * Gets the singleton instance of the {@code MutableRecordDeserializerFactory}.
- *
- * @param <E> The generic type of the record to be deserialized.
- * @return An instance of the factory.
- */
- public static final <E extends IOReadableWritable> RecordDeserializerFactory<E> get() {
- @SuppressWarnings("unchecked")
- RecordDeserializerFactory<E> toReturn = (RecordDeserializerFactory<E>) INSTANCE;
- return toReturn;
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/352c1b99/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/io/MutableRecordReader.java
----------------------------------------------------------------------
diff --git a/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/io/MutableRecordReader.java b/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/io/MutableRecordReader.java
deleted file mode 100644
index 23c26c4..0000000
--- a/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/io/MutableRecordReader.java
+++ /dev/null
@@ -1,119 +0,0 @@
-/***********************************************************************************************************************
- * Copyright (C) 2010-2013 by the Stratosphere project (http://stratosphere.eu)
- *
- * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
- * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations under the License.
- **********************************************************************************************************************/
-
-package eu.stratosphere.nephele.io;
-
-import java.io.IOException;
-
-import eu.stratosphere.core.io.IOReadableWritable;
-import eu.stratosphere.nephele.template.AbstractOutputTask;
-import eu.stratosphere.nephele.template.AbstractTask;
-
-public class MutableRecordReader<T extends IOReadableWritable> extends AbstractSingleGateRecordReader<T> implements MutableReader<T> {
-
- private boolean endOfStream;
-
-
- /**
- * Constructs a new mutable record reader and registers a new input gate with the application's environment.
- *
- * @param taskBase The application that instantiated the record reader.
- */
- public MutableRecordReader(final AbstractTask taskBase) {
- super(taskBase, MutableRecordDeserializerFactory.<T>get(), 0);
- }
-
- /**
- * Constructs a new record reader and registers a new input gate with the application's environment.
- *
- * @param outputBase The application that instantiated the record reader.
- */
- public MutableRecordReader(final AbstractOutputTask outputBase) {
- super(outputBase, MutableRecordDeserializerFactory.<T>get(), 0);
- }
-
- /**
- * Constructs a new record reader and registers a new input gate with the application's environment.
- *
- * @param taskBase
- * the application that instantiated the record reader
- * @param inputGateID
- * The ID of the input gate that the reader reads from.
- */
- public MutableRecordReader(final AbstractTask taskBase, final int inputGateID) {
- super(taskBase, MutableRecordDeserializerFactory.<T>get(), inputGateID);
- }
-
- /**
- * Constructs a new record reader and registers a new input gate with the application's environment.
- *
- * @param outputBase
- * the application that instantiated the record reader
- * @param inputGateID
- * The ID of the input gate that the reader reads from.
- */
- public MutableRecordReader(final AbstractOutputTask outputBase, final int inputGateID) {
- super(outputBase, MutableRecordDeserializerFactory.<T>get(), inputGateID);
- }
-
- // --------------------------------------------------------------------------------------------
-
- @Override
- public boolean next(final T target) throws IOException, InterruptedException {
- if (this.endOfStream) {
- return false;
-
- }
- while (true) {
- InputChannelResult result = this.inputGate.readRecord(target);
- switch (result) {
- case INTERMEDIATE_RECORD_FROM_BUFFER:
- case LAST_RECORD_FROM_BUFFER:
- return true;
-
- case END_OF_SUPERSTEP:
- if (incrementEndOfSuperstepEventAndCheck()) {
- return false; // end of the superstep
- }
- else {
- break; // fall through and wait for next record/event
- }
-
- case TASK_EVENT:
- handleEvent(this.inputGate.getCurrentEvent());
- break; // fall through to get next record
-
- case END_OF_STREAM:
- this.endOfStream = true;
- return false;
-
- default:
- ; // fall through to get next record
- }
- }
- }
-
- @Override
- public boolean isInputClosed() {
- return this.endOfStream;
- }
-
- @Override
- public void setIterative(int numEventsUntilEndOfSuperstep) {
- // sanity check for debug purposes
- if (numEventsUntilEndOfSuperstep != getNumberOfInputChannels()) {
- throw new IllegalArgumentException("Number of events till end of superstep is different from the number of input channels.");
- }
- super.setIterative(numEventsUntilEndOfSuperstep);
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/352c1b99/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/io/MutableUnionRecordReader.java
----------------------------------------------------------------------
diff --git a/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/io/MutableUnionRecordReader.java b/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/io/MutableUnionRecordReader.java
deleted file mode 100644
index 1fc8dd2..0000000
--- a/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/io/MutableUnionRecordReader.java
+++ /dev/null
@@ -1,37 +0,0 @@
-/***********************************************************************************************************************
- * Copyright (C) 2010-2013 by the Stratosphere project (http://stratosphere.eu)
- *
- * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
- * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations under the License.
- **********************************************************************************************************************/
-
-package eu.stratosphere.nephele.io;
-
-import java.io.IOException;
-
-import eu.stratosphere.core.io.IOReadableWritable;
-
-public class MutableUnionRecordReader<T extends IOReadableWritable> extends AbstractUnionRecordReader<T> implements MutableReader<T> {
-
-
- /**
- * Constructs a new mutable union record reader.
- *
- * @param recordReaders
- * the individual mutable record readers whose input is used to construct the union
- */
- public MutableUnionRecordReader(MutableRecordReader<T>[] recordReaders) {
- super(recordReaders);
- }
-
- @Override
- public boolean next(T target) throws IOException, InterruptedException {
- return getNextRecord(target);
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/352c1b99/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/io/OutputGate.java
----------------------------------------------------------------------
diff --git a/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/io/OutputGate.java b/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/io/OutputGate.java
deleted file mode 100644
index 0bbe5e0..0000000
--- a/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/io/OutputGate.java
+++ /dev/null
@@ -1,149 +0,0 @@
-/***********************************************************************************************************************
- * Copyright (C) 2010-2013 by the Stratosphere project (http://stratosphere.eu)
- *
- * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
- * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations under the License.
- **********************************************************************************************************************/
-
-package eu.stratosphere.nephele.io;
-
-import java.io.IOException;
-import java.util.List;
-
-import eu.stratosphere.core.io.IOReadableWritable;
-import eu.stratosphere.nephele.io.channels.AbstractOutputChannel;
-import eu.stratosphere.nephele.io.channels.ChannelID;
-import eu.stratosphere.nephele.io.channels.bytebuffered.InMemoryOutputChannel;
-import eu.stratosphere.nephele.io.channels.bytebuffered.NetworkOutputChannel;
-
-/**
- * In Nephele output gates are a specialization of general gates and connect
- * record writers and output channels. As channels, output gates are always
- * parameterized to a specific type of record which they can transport.
- *
- * @param <T>
- * the type of record that can be transported through this gate
- */
-public interface OutputGate<T extends IOReadableWritable> extends Gate<T> {
-
- /**
- * Returns the type of record that can be transported through this gate.
- *
- * @return the type of record that can be transported through this gate
- */
- Class<T> getType();
-
- /**
- * Writes a record to one of the associated output channels. Currently, the
- * channels are chosen in a simple round-robin fashion. This operation may
- * block until the respective channel has received the data.
- *
- * @param record
- * the record to be written
- * @throws IOException
- * thrown if any error occurs during channel I/O
- */
- void writeRecord(T record) throws IOException, InterruptedException;
-
- /**
- * Returns all the OutputChannels connected to this gate
- *
- * @return the list of OutputChannels connected to this RecordWriter
- */
- List<AbstractOutputChannel<T>> getOutputChannels();
-
- /**
- * Flushes all connected output channels.
- *
- * @throws IOException
- * thrown if an error occurs while flushing an output channel
- * @throws InterruptedException
- * thrown if the thread is interrupted while waiting for the data to be flushed
- */
- void flush() throws IOException, InterruptedException;
-
- /**
- * Checks if this output gate operates in broadcast mode, i.e. all records passed to it are transferred through all
- * connected output channels.
- *
- * @return <code>true</code> if this output gate operates in broadcast mode, <code>false</code> otherwise
- */
- boolean isBroadcast();
-
- /**
- * Returns the number of output channels associated with this output gate.
- *
- * @return the number of output channels associated with this output gate
- */
- int getNumberOfOutputChannels();
-
- /**
- * Returns the output channel from position <code>pos</code> of the gate's
- * internal channel list.
- *
- * @param pos
- * the position to retrieve the channel from
- * @return the channel from the given position or <code>null</code> if such
- * position does not exist.
- */
- AbstractOutputChannel<T> getOutputChannel(int pos);
-
- /**
- * Returns the output gate's channel selector.
- *
- * @return the output gate's channel selector or <code>null</code> if the gate operates in broadcast mode
- */
- ChannelSelector<T> getChannelSelector();
-
- /**
- * Requests the output gate to closed. This means the application will send
- * no records through this gate anymore.
- *
- * @throws IOException
- * @throws InterruptedException
- */
- void requestClose() throws IOException, InterruptedException;
-
- /**
- * Removes all output channels from the output gate.
- */
- void removeAllOutputChannels();
-
- /**
- * Creates a new network output channel and assigns it to the given output gate.
- *
- * @param outputGate
- * the output gate the channel shall be assigned to
- * @param channelID
- * the ID of the channel
- * @param connectedChannelID
- * the ID of the channel this channel is connected to
- * @param compressionLevel
- * the level of compression to be used for this channel
- * @return the new network output channel
- */
- NetworkOutputChannel<T> createNetworkOutputChannel(OutputGate<T> outputGate, ChannelID channelID,
- ChannelID connectedChannelID);
-
- /**
- * Creates a new in-memory output channel and assigns it to the given output gate.
- *
- * @param outputGate
- * the output gate the channel shall be assigned to
- * @param channelID
- * the ID of the channel
- * @param connectedChannelID
- * the ID of the channel this channel is connected to
- * @param compressionLevel
- * the level of compression to be used for this channel
- * @return the new in-memory output channel
- */
- InMemoryOutputChannel<T> createInMemoryOutputChannel(OutputGate<T> outputGate, ChannelID channelID,
- ChannelID connectedChannelID);
-}
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/352c1b99/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/io/Reader.java
----------------------------------------------------------------------
diff --git a/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/io/Reader.java b/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/io/Reader.java
deleted file mode 100644
index 80d2010..0000000
--- a/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/io/Reader.java
+++ /dev/null
@@ -1,30 +0,0 @@
-/***********************************************************************************************************************
- * Copyright (C) 2010-2013 by the Stratosphere project (http://stratosphere.eu)
- *
- * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
- * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations under the License.
- **********************************************************************************************************************/
-
-package eu.stratosphere.nephele.io;
-
-import java.io.IOException;
-
-import eu.stratosphere.core.io.IOReadableWritable;
-
-/**
- * A reader interface to read records from an input.
- *
- * @param <T> The type of the record that can be emitted with this record writer
- */
-public interface Reader<T extends IOReadableWritable> extends ReaderBase {
-
- boolean hasNext() throws IOException, InterruptedException;
-
- T next() throws IOException, InterruptedException;
-}
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/352c1b99/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/io/ReaderBase.java
----------------------------------------------------------------------
diff --git a/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/io/ReaderBase.java b/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/io/ReaderBase.java
deleted file mode 100644
index ae69ad0..0000000
--- a/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/io/ReaderBase.java
+++ /dev/null
@@ -1,67 +0,0 @@
-/***********************************************************************************************************************
- * Copyright (C) 2010-2013 by the Stratosphere project (http://stratosphere.eu)
- *
- * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
- * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations under the License.
- **********************************************************************************************************************/
-package eu.stratosphere.nephele.io;
-
-import java.io.IOException;
-
-import eu.stratosphere.nephele.event.task.AbstractTaskEvent;
-import eu.stratosphere.nephele.event.task.EventListener;
-
-
-/**
- *
- */
-public interface ReaderBase {
-
- boolean isInputClosed();
-
- /**
- * Subscribes the listener object to receive events of the given type.
- *
- * @param eventListener
- * the listener object to register
- * @param eventType
- * the type of event to register the listener for
- */
- void subscribeToEvent(EventListener eventListener, Class<? extends AbstractTaskEvent> eventType);
-
- /**
- * Removes the subscription for events of the given type for the listener object.
- *
- * @param eventListener
- * the listener object to cancel the subscription for
- * @param eventType
- * the type of the event to cancel the subscription for
- */
- void unsubscribeFromEvent(final EventListener eventListener, final Class<? extends AbstractTaskEvent> eventType);
-
- /**
- * Publishes an event.
- *
- * @param event
- * the event to be published
- * @throws IOException
- * thrown if an error occurs while transmitting the event
- * @throws InterruptedException
- * thrown if the thread is interrupted while waiting for the event to be published
- */
- void publishEvent(AbstractTaskEvent event) throws IOException, InterruptedException;
-
-
- void setIterative(int numEventsUntilEndOfSuperstep);
-
-
- void startNextSuperstep();
-
- boolean hasReachedEndOfSuperstep();
-}
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/352c1b99/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/io/RecordAvailabilityListener.java
----------------------------------------------------------------------
diff --git a/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/io/RecordAvailabilityListener.java b/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/io/RecordAvailabilityListener.java
deleted file mode 100644
index d22f048..0000000
--- a/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/io/RecordAvailabilityListener.java
+++ /dev/null
@@ -1,35 +0,0 @@
-/***********************************************************************************************************************
- * Copyright (C) 2010-2013 by the Stratosphere project (http://stratosphere.eu)
- *
- * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
- * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations under the License.
- **********************************************************************************************************************/
-
-package eu.stratosphere.nephele.io;
-
-import eu.stratosphere.core.io.IOReadableWritable;
-
-/**
- * This interface can be implemented by a class which shall be notified by an input gate when one of the its connected
- * input channels has at least one record available for reading.
- *
- * @param <T>
- * the type of record transported through the corresponding input gate
- */
-public interface RecordAvailabilityListener<T extends IOReadableWritable> {
-
- /**
- * This method is called by an input gate when one of its connected input channels has at least one record available
- * for reading.
- *
- * @param inputGate
- * the input gate which has at least one record available
- */
- void reportRecordAvailability(InputGate<T> inputGate);
-}
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/352c1b99/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/io/RecordDeserializer.java
----------------------------------------------------------------------
diff --git a/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/io/RecordDeserializer.java b/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/io/RecordDeserializer.java
deleted file mode 100644
index e8ccb77..0000000
--- a/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/io/RecordDeserializer.java
+++ /dev/null
@@ -1,55 +0,0 @@
-/***********************************************************************************************************************
- * Copyright (C) 2010-2013 by the Stratosphere project (http://stratosphere.eu)
- *
- * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
- * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations under the License.
- **********************************************************************************************************************/
-
-package eu.stratosphere.nephele.io;
-
-import java.io.IOException;
-import java.nio.channels.ReadableByteChannel;
-
-/**
- * This interface must be implemented by classes which transfer bytes streams back into {@link Record} objects.
- *
- * @param <T> The type of record this record deserializer works with.
- */
-public interface RecordDeserializer<T>
-{
- /**
- * Transforms a record back from a readable byte channel. The deserialization might not complete, because the channel
- * has not all required data available. In that case, this method must return {@code null}. Furthermore, it may
- * not retain a reference to the given target object in that case, but must manage to put the data aside.
- *
- * @param target The record object into which to deserialize the data. May be null for deserializers
- * that operate on immutable objects, in which case the deserializer has to instantiate an
- * object. In the case where this object is non-null, but the deserialization does not complete,
- * the object must not be used to cache the partial state, as it is not guaranteed that the object
- * will remain unchanged until the next attempt to continue the deserialization.
- * @param in The byte stream which contains the record's data.
- * @return The record deserialized from <code>in</code>, or null, if the record .
- * @throws IOException Thrown if an I/O error occurs while deserializing the record from the stream
- */
- T readData(final T target, final ReadableByteChannel readableByteChannel) throws IOException;
-
- /**
- * Clears the internal buffers of the deserializer and resets its state.
- */
- void clear();
-
- /**
- * Checks whether the deserializer has data from a previous deserialization attempt stored in its internal buffers which
- * is not yet finished.
- *
- * @return <code>true</code>, if the deserializer's internal buffers contain data from a previous deserialization
- * attempt, <code>false</code> otherwise.
- */
- boolean hasUnfinishedData();
-}
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/352c1b99/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/io/RecordDeserializerFactory.java
----------------------------------------------------------------------
diff --git a/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/io/RecordDeserializerFactory.java b/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/io/RecordDeserializerFactory.java
deleted file mode 100644
index eadd326..0000000
--- a/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/io/RecordDeserializerFactory.java
+++ /dev/null
@@ -1,31 +0,0 @@
-/***********************************************************************************************************************
- * Copyright (C) 2010-2013 by the Stratosphere project (http://stratosphere.eu)
- *
- * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
- * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations under the License.
- **********************************************************************************************************************/
-
-package eu.stratosphere.nephele.io;
-
-/**
- * A simple factory to instantiate record deserializer objects. Since a deserializer might be stateful, the system
- * must be able to instantiate an arbitrary number of them, equal to the number of data channels.
- *
- * If the created deserializers are in fact not stateful, the factory should return a shared object.
- */
-public interface RecordDeserializerFactory<T>
-{
- /**
- * Creates a new instance of the deserializer. The returned instance may not share any state with
- * any previously returned instance.
- *
- * @return An instance of the deserializer.
- */
- RecordDeserializer<T> createDeserializer();
-}
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/352c1b99/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/io/RecordReader.java
----------------------------------------------------------------------
diff --git a/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/io/RecordReader.java b/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/io/RecordReader.java
deleted file mode 100644
index 5010013..0000000
--- a/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/io/RecordReader.java
+++ /dev/null
@@ -1,153 +0,0 @@
-/***********************************************************************************************************************
- * Copyright (C) 2010-2013 by the Stratosphere project (http://stratosphere.eu)
- *
- * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
- * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations under the License.
- **********************************************************************************************************************/
-
-package eu.stratosphere.nephele.io;
-
-import java.io.IOException;
-
-import eu.stratosphere.core.io.IOReadableWritable;
-import eu.stratosphere.nephele.template.AbstractOutputTask;
-import eu.stratosphere.nephele.template.AbstractTask;
-
-/**
- * A record writer connects an input gate to an application. It allows the application
- * query for incoming records and read them from input gate.
- *
- * @param <T> The type of the record that can be read from this record reader.
- */
-public class RecordReader<T extends IOReadableWritable> extends AbstractSingleGateRecordReader<T> implements Reader<T> {
-
- private final Class<T> recordType;
-
- /**
- * Stores the last read record.
- */
- private T lookahead;
-
- /**
- * Stores if more no more records will be received from the assigned input gate.
- */
- private boolean noMoreRecordsWillFollow;
-
- // --------------------------------------------------------------------------------------------
-
- /**
- * Constructs a new record reader and registers a new input gate with the application's environment.
- *
- * @param taskBase
- * The application that instantiated the record reader.
- * @param recordType
- * The class of records that can be read from the record reader.
- */
- public RecordReader(AbstractTask taskBase, Class<T> recordType) {
- super(taskBase, MutableRecordDeserializerFactory.<T>get(), 0);
- this.recordType = recordType;
- }
-
- /**
- * Constructs a new record reader and registers a new input gate with the application's environment.
- *
- * @param outputBase
- * The application that instantiated the record reader.
- * @param recordType
- * The class of records that can be read from the record reader.
- */
- public RecordReader(AbstractOutputTask outputBase, Class<T> recordType) {
- super(outputBase, MutableRecordDeserializerFactory.<T>get(), 0);
- this.recordType = recordType;
- }
-
- // --------------------------------------------------------------------------------------------
-
- /**
- * Checks if at least one more record can be read from the associated input gate. This method may block
- * until the associated input gate is able to read the record from one of its input channels.
- *
- * @return <code>true</code>it at least one more record can be read from the associated input gate, otherwise
- * <code>false</code>
- */
- @Override
- public boolean hasNext() throws IOException, InterruptedException{
- if (this.lookahead != null) {
- return true;
- } else {
- if (this.noMoreRecordsWillFollow) {
- return false;
- }
-
- T record = instantiateRecordType();
-
- while (true) {
- InputChannelResult result = this.inputGate.readRecord(record);
- switch (result) {
- case INTERMEDIATE_RECORD_FROM_BUFFER:
- case LAST_RECORD_FROM_BUFFER:
- this.lookahead = record;
- return true;
-
- case END_OF_SUPERSTEP:
- if (incrementEndOfSuperstepEventAndCheck()) {
- return false;
- }
- else {
- break; // fall through and wait for next record/event
- }
-
- case TASK_EVENT:
- handleEvent(this.inputGate.getCurrentEvent());
- break;
-
- case END_OF_STREAM:
- this.noMoreRecordsWillFollow = true;
- return false;
-
- default:
- ; // fall through the loop
- }
- }
- }
- }
-
- /**
- * Reads the current record from the associated input gate.
- *
- * @return the current record from the associated input gate.
- * @throws IOException
- * thrown if any error occurs while reading the record from the input gate
- */
- @Override
- public T next() throws IOException, InterruptedException {
- if (hasNext()) {
- T tmp = this.lookahead;
- this.lookahead = null;
- return tmp;
- } else {
- return null;
- }
- }
-
- @Override
- public boolean isInputClosed() {
- return this.noMoreRecordsWillFollow;
- }
-
- private T instantiateRecordType() {
- try {
- return this.recordType.newInstance();
- } catch (InstantiationException e) {
- throw new RuntimeException("Cannot instantiate class '" + this.recordType.getName() + "'.", e);
- } catch (IllegalAccessException e) {
- throw new RuntimeException("Cannot instantiate class '" + this.recordType.getName() + "'.", e);
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/352c1b99/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/io/RecordWriter.java
----------------------------------------------------------------------
diff --git a/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/io/RecordWriter.java b/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/io/RecordWriter.java
deleted file mode 100644
index 2f4c21b..0000000
--- a/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/io/RecordWriter.java
+++ /dev/null
@@ -1,82 +0,0 @@
-/***********************************************************************************************************************
- * Copyright (C) 2010-2013 by the Stratosphere project (http://stratosphere.eu)
- *
- * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
- * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations under the License.
- **********************************************************************************************************************/
-
-package eu.stratosphere.nephele.io;
-
-import eu.stratosphere.core.io.IOReadableWritable;
-import eu.stratosphere.nephele.template.AbstractInputTask;
-import eu.stratosphere.nephele.template.AbstractTask;
-
-/**
- * A record writer connects the application to an output gate. It allows the application
- * of emit (send out) to the output gate. The output gate will then take care of distributing
- * the emitted records among the output channels.
- *
- * @param <T>
- * the type of the record that can be emitted with this record writer
- */
-public class RecordWriter<T extends IOReadableWritable> extends AbstractRecordWriter<T> {
-
- /**
- * Constructs a new record writer and registers a new output gate with the application's environment.
- *
- * @param taskBase
- * the application that instantiated the record writer
- * @param outputClass
- * the class of records that can be emitted with this record writer
- * @param selector
- * the channel selector to be used to determine the output channel to be used for a record
- */
- public RecordWriter(AbstractTask taskBase, Class<T> outputClass, ChannelSelector<T> selector) {
- super(taskBase, outputClass, selector, false);
- }
-
- /**
- * Constructs a new record writer and registers a new output gate with the application's environment.
- *
- * @param taskBase
- * the application that instantiated the record writer
- * @param outputClass
- * the class of records that can be emitted with this record writer
- */
- public RecordWriter(AbstractTask taskBase, Class<T> outputClass) {
- super(taskBase, outputClass, null, false);
- }
-
- /**
- * This method emits a record to the corresponding output gate. The method may block
- * until the record was transfered via any of the connected channels.
- *
- * @param inputBase
- * the application that instantiated the record writer
- * @param outputClass
- * the class of records that can be emitted with this record writer
- */
- public RecordWriter(AbstractInputTask<?> inputBase, Class<T> outputClass) {
- super(inputBase, outputClass, null, false);
- }
-
- /**
- * Constructs a new record writer and registers a new output gate with the application's environment.
- *
- * @param inputBase
- * the application that instantiated the record writer
- * @param outputClass
- * the class of records that can be emitted with this record writer
- * @param selector
- * the channel selector to be used to determine the output channel to be used for a record
- */
- public RecordWriter(AbstractInputTask<?> inputBase, Class<T> outputClass, ChannelSelector<T> selector) {
- super(inputBase, outputClass, selector, false);
- }
-}