You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by rm...@apache.org on 2014/06/10 21:35:19 UTC
[22/34] Offer buffer-oriented API for I/O (#25)
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/352c1b99/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/io/RuntimeInputGate.java
----------------------------------------------------------------------
diff --git a/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/io/RuntimeInputGate.java b/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/io/RuntimeInputGate.java
deleted file mode 100644
index 71cad15..0000000
--- a/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/io/RuntimeInputGate.java
+++ /dev/null
@@ -1,330 +0,0 @@
-/***********************************************************************************************************************
- * Copyright (C) 2010-2013 by the Stratosphere project (http://stratosphere.eu)
- *
- * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
- * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations under the License.
- **********************************************************************************************************************/
-
-package eu.stratosphere.nephele.io;
-
-import java.io.IOException;
-import java.util.ArrayList;
-import java.util.Iterator;
-import java.util.concurrent.BlockingQueue;
-import java.util.concurrent.LinkedBlockingQueue;
-import java.util.concurrent.atomic.AtomicReference;
-
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-
-import eu.stratosphere.core.io.IOReadableWritable;
-import eu.stratosphere.nephele.event.task.AbstractEvent;
-import eu.stratosphere.nephele.event.task.AbstractTaskEvent;
-import eu.stratosphere.nephele.io.channels.AbstractInputChannel;
-import eu.stratosphere.nephele.io.channels.ChannelID;
-import eu.stratosphere.nephele.io.channels.bytebuffered.InMemoryInputChannel;
-import eu.stratosphere.nephele.io.channels.bytebuffered.NetworkInputChannel;
-import eu.stratosphere.nephele.jobgraph.JobID;
-
-/**
- * In Nephele input gates are a specialization of general gates and connect input channels and record readers. As
- * channels, input gates are always parameterized to a specific type of record which they can transport. In contrast to
- * output gates input gates can be associated with a {@link DistributionPattern} object which dictates the concrete
- * wiring between two groups of vertices.
- *
- * @param <T> The type of record that can be transported through this gate.
- */
-public class RuntimeInputGate<T extends IOReadableWritable> extends AbstractGate<T> implements InputGate<T> {
-
- /**
- * The log object used for debugging.
- */
- private static final Log LOG = LogFactory.getLog(InputGate.class);
-
- /**
- * The deserializer factory used to instantiate the deserializers that construct records from byte streams.
- */
- private final RecordDeserializerFactory<T> deserializerFactory;
-
- /**
- * The list of input channels attached to this input gate.
- */
- private final ArrayList<AbstractInputChannel<T>> inputChannels = new ArrayList<AbstractInputChannel<T>>();
-
- /**
- * Queue with indices of channels that store at least one available record.
- */
- private final BlockingQueue<Integer> availableChannels = new LinkedBlockingQueue<Integer>();
-
- /**
- * The listener object to be notified when a channel has at least one record available.
- */
- private final AtomicReference<RecordAvailabilityListener<T>> recordAvailabilityListener = new AtomicReference<RecordAvailabilityListener<T>>(null);
-
-
- private AbstractTaskEvent currentEvent;
-
- /**
- * If the value of this variable is set to <code>true</code>, the input gate is closed.
- */
- private boolean isClosed = false;
-
- /**
- * The channel to read from next.
- */
- private int channelToReadFrom = -1;
-
- /**
- * Constructs a new runtime input gate.
- *
- * @param jobID
- * the ID of the job this input gate belongs to
- * @param gateID
- * the ID of the gate
- * @param deserializerFactory
- * The factory used to instantiate the deserializers that construct records from byte streams.
- * @param index
- * the index assigned to this input gate at the {@link Environment} object
- */
- public RuntimeInputGate(final JobID jobID, final GateID gateID,
- final RecordDeserializerFactory<T> deserializerFactory, final int index) {
- super(jobID, gateID, index);
- this.deserializerFactory = deserializerFactory;
- }
-
- /**
- * Adds a new input channel to the input gate.
- *
- * @param inputChannel
- * the input channel to be added.
- */
- private void addInputChannel(AbstractInputChannel<T> inputChannel) {
- // in high DOPs, this can be a serious performance issue, as adding all channels and checking linearly has a
- // quadratic complexity!
- if (!this.inputChannels.contains(inputChannel)) {
- this.inputChannels.add(inputChannel);
- }
- }
-
- /**
- * Removes the input channel with the given ID from the input gate if it exists.
- *
- * @param inputChannelID
- * the ID of the channel to be removed
- */
- public void removeInputChannel(ChannelID inputChannelID) {
-
- for (int i = 0; i < this.inputChannels.size(); i++) {
-
- final AbstractInputChannel<T> inputChannel = this.inputChannels.get(i);
- if (inputChannel.getID().equals(inputChannelID)) {
- this.inputChannels.remove(i);
- return;
- }
- }
-
- if (LOG.isDebugEnabled()) {
- LOG.debug("Cannot find output channel with ID " + inputChannelID + " to remove");
- }
- }
-
- @Override
- public boolean isInputGate() {
- return true;
- }
-
- @Override
- public int getNumberOfInputChannels() {
- return this.inputChannels.size();
- }
-
- @Override
- public AbstractInputChannel<T> getInputChannel(int pos) {
- return this.inputChannels.get(pos);
- }
-
-
- @Override
- public NetworkInputChannel<T> createNetworkInputChannel(final InputGate<T> inputGate, final ChannelID channelID,
- final ChannelID connectedChannelID) {
-
- final NetworkInputChannel<T> enic = new NetworkInputChannel<T>(inputGate, this.inputChannels.size(),
- this.deserializerFactory.createDeserializer(), channelID, connectedChannelID);
- addInputChannel(enic);
-
- return enic;
- }
-
-
- @Override
- public InMemoryInputChannel<T> createInMemoryInputChannel(final InputGate<T> inputGate, final ChannelID channelID,
- final ChannelID connectedChannelID) {
-
- final InMemoryInputChannel<T> eimic = new InMemoryInputChannel<T>(inputGate, this.inputChannels.size(),
- this.deserializerFactory.createDeserializer(), channelID, connectedChannelID);
- addInputChannel(eimic);
-
- return eimic;
- }
-
-
- @Override
- public InputChannelResult readRecord(T target) throws IOException, InterruptedException {
-
- if (this.channelToReadFrom == -1) {
- if (this.isClosed()) {
- return InputChannelResult.END_OF_STREAM;
- }
-
- if (Thread.interrupted()) {
- throw new InterruptedException();
- }
-
- this.channelToReadFrom = waitForAnyChannelToBecomeAvailable();
- }
-
- InputChannelResult result = this.getInputChannel(this.channelToReadFrom).readRecord(target);
- switch (result) {
- case INTERMEDIATE_RECORD_FROM_BUFFER: // full record and we can stay on the same channel
- return InputChannelResult.INTERMEDIATE_RECORD_FROM_BUFFER;
-
- case LAST_RECORD_FROM_BUFFER: // full record, but we must switch the channel afterwards
- this.channelToReadFrom = -1;
- return InputChannelResult.LAST_RECORD_FROM_BUFFER;
-
- case END_OF_SUPERSTEP:
- this.channelToReadFrom = -1;
- return InputChannelResult.END_OF_SUPERSTEP;
-
- case TASK_EVENT: // task event
- this.currentEvent = this.getInputChannel(this.channelToReadFrom).getCurrentEvent();
- this.channelToReadFrom = -1; // event always marks a unit as consumed
- return InputChannelResult.TASK_EVENT;
-
- case NONE: // internal event or an incomplete record that needs further chunks
- // the current unit is exhausted
- this.channelToReadFrom = -1;
- return InputChannelResult.NONE;
-
- case END_OF_STREAM: // channel is done
- this.channelToReadFrom = -1;
- return isClosed() ? InputChannelResult.END_OF_STREAM : InputChannelResult.NONE;
-
- default: // silence the compiler
- throw new RuntimeException();
- }
- }
-
- @Override
- public AbstractTaskEvent getCurrentEvent() {
- AbstractTaskEvent e = this.currentEvent;
- this.currentEvent = null;
- return e;
- }
-
- @Override
- public void notifyRecordIsAvailable(int channelIndex) {
- this.availableChannels.add(Integer.valueOf(channelIndex));
-
- RecordAvailabilityListener<T> listener = this.recordAvailabilityListener.get();
- if (listener != null) {
- listener.reportRecordAvailability(this);
- }
- }
-
- /**
- * This method returns the index of a channel which has at least
- * one record available. The method may block until at least one
- * channel has become ready.
- *
- * @return the index of the channel which has at least one record available
- */
- public int waitForAnyChannelToBecomeAvailable() throws InterruptedException {
- return this.availableChannels.take().intValue();
- }
-
-
- @Override
- public boolean isClosed() throws IOException, InterruptedException {
-
- if (this.isClosed) {
- return true;
- }
-
- for (int i = 0; i < this.getNumberOfInputChannels(); i++) {
- final AbstractInputChannel<T> inputChannel = this.inputChannels.get(i);
- if (!inputChannel.isClosed()) {
- return false;
- }
- }
-
- this.isClosed = true;
-
- return true;
- }
-
-
- @Override
- public void close() throws IOException, InterruptedException {
-
- for (int i = 0; i < this.getNumberOfInputChannels(); i++) {
- final AbstractInputChannel<T> inputChannel = this.inputChannels.get(i);
- inputChannel.close();
- }
-
- }
-
-
- @Override
- public String toString() {
- return "Input " + super.toString();
- }
-
-
- @Override
- public void publishEvent(AbstractEvent event) throws IOException, InterruptedException {
-
- // Copy event to all connected channels
- final Iterator<AbstractInputChannel<T>> it = this.inputChannels.iterator();
- while (it.hasNext()) {
- it.next().transferEvent(event);
- }
- }
-
- /**
- * Returns the {@link RecordDeserializerFactory} used by this input gate.
- *
- * @return The {@link RecordDeserializerFactory} used by this input gate.
- */
- public RecordDeserializerFactory<T> getRecordDeserializerFactory() {
- return this.deserializerFactory;
- }
-
-
- @Override
- public void releaseAllChannelResources() {
-
- final Iterator<AbstractInputChannel<T>> it = this.inputChannels.iterator();
- while (it.hasNext()) {
- it.next().releaseAllResources();
- }
- }
-
- @Override
- public void registerRecordAvailabilityListener(final RecordAvailabilityListener<T> listener) {
- if (!this.recordAvailabilityListener.compareAndSet(null, listener)) {
- throw new IllegalStateException(this.recordAvailabilityListener
- + " is already registered as a record availability listener");
- }
- }
-
- public void notifyDataUnitConsumed(int channelIndex) {
- this.channelToReadFrom = -1;
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/352c1b99/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/io/RuntimeOutputGate.java
----------------------------------------------------------------------
diff --git a/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/io/RuntimeOutputGate.java b/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/io/RuntimeOutputGate.java
deleted file mode 100644
index 20efd94..0000000
--- a/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/io/RuntimeOutputGate.java
+++ /dev/null
@@ -1,333 +0,0 @@
-/***********************************************************************************************************************
- * Copyright (C) 2010-2013 by the Stratosphere project (http://stratosphere.eu)
- *
- * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
- * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations under the License.
- **********************************************************************************************************************/
-
-package eu.stratosphere.nephele.io;
-
-import java.io.IOException;
-import java.util.ArrayList;
-import java.util.Iterator;
-import java.util.List;
-
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-
-import eu.stratosphere.core.io.IOReadableWritable;
-import eu.stratosphere.nephele.event.task.AbstractEvent;
-import eu.stratosphere.nephele.io.channels.AbstractOutputChannel;
-import eu.stratosphere.nephele.io.channels.ChannelID;
-import eu.stratosphere.nephele.io.channels.ChannelType;
-import eu.stratosphere.nephele.io.channels.bytebuffered.InMemoryOutputChannel;
-import eu.stratosphere.nephele.io.channels.bytebuffered.NetworkOutputChannel;
-import eu.stratosphere.nephele.jobgraph.JobID;
-
-/**
- * In Nephele output gates are a specialization of general gates and connect
- * record writers and output channels. As channels, output gates are always
- * parameterized to a specific type of record which they can transport.
- * <p>
- * This class is in general not thread-safe.
- *
- * @param <T>
- * the type of record that can be transported through this gate
- */
-public class RuntimeOutputGate<T extends IOReadableWritable> extends AbstractGate<T> implements OutputGate<T> {
-
- /**
- * The log object used for debugging.
- */
- private static final Log LOG = LogFactory.getLog(OutputGate.class);
-
- /**
- * The list of output channels attached to this gate.
- */
- private final ArrayList<AbstractOutputChannel<T>> outputChannels = new ArrayList<AbstractOutputChannel<T>>();
-
- /**
- * Channel selector to determine which channel is supposed receive the next record.
- */
- private final ChannelSelector<T> channelSelector;
-
- /**
- * The class of the record transported through this output gate.
- */
- private final Class<T> type;
-
- /**
- * Stores whether all records passed to this output gate shall be transmitted through all connected output channels.
- */
- private final boolean isBroadcast;
-
- /**
- * Constructs a new runtime output gate.
- *
- * @param jobID
- * the ID of the job this input gate belongs to
- * @param gateID
- * the ID of the gate
- * @param inputClass
- * the class of the record that can be transported through this
- * gate
- * @param index
- * the index assigned to this output gate at the {@link Environment} object
- * @param channelSelector
- * the channel selector to be used for this output gate
- * @param isBroadcast
- * <code>true</code> if every records passed to this output gate shall be transmitted through all connected
- * output channels, <code>false</code> otherwise
- */
- public RuntimeOutputGate(final JobID jobID, final GateID gateID, final Class<T> inputClass, final int index,
- final ChannelSelector<T> channelSelector, final boolean isBroadcast) {
-
- super(jobID, gateID, index);
-
- this.isBroadcast = isBroadcast;
- this.type = inputClass;
-
- if (this.isBroadcast) {
- this.channelSelector = null;
- } else {
- if (channelSelector == null) {
- this.channelSelector = new DefaultChannelSelector<T>();
- } else {
- this.channelSelector = channelSelector;
- }
- }
- }
-
-
- @Override
- public final Class<T> getType() {
- return this.type;
- }
-
- /**
- * Adds a new output channel to the output gate.
- *
- * @param outputChannel
- * the output channel to be added.
- */
- private void addOutputChannel(AbstractOutputChannel<T> outputChannel) {
- if (!this.outputChannels.contains(outputChannel)) {
- this.outputChannels.add(outputChannel);
- }
- }
-
- /**
- * Removes the output channel with the given ID from the output gate if it
- * exists.
- *
- * @param outputChannelID
- * the ID of the channel to be removed
- */
- public void removeOutputChannel(ChannelID outputChannelID) {
-
- for (int i = 0; i < this.outputChannels.size(); i++) {
-
- final AbstractOutputChannel<T> outputChannel = this.outputChannels.get(i);
- if (outputChannel.getID().equals(outputChannelID)) {
- this.outputChannels.remove(i);
- return;
- }
- }
-
- LOG.debug("Cannot find output channel with ID " + outputChannelID + " to remove");
- }
-
- /**
- * Removes all output channels from the output gate.
- */
- public void removeAllOutputChannels() {
-
- this.outputChannels.clear();
- }
-
-
- @Override
- public boolean isInputGate() {
-
- return false;
- }
-
-
- @Override
- public int getNumberOfOutputChannels() {
-
- return this.outputChannels.size();
- }
-
- /**
- * Returns the output channel from position <code>pos</code> of the gate's
- * internal channel list.
- *
- * @param pos
- * the position to retrieve the channel from
- * @return the channel from the given position or <code>null</code> if such
- * position does not exist.
- */
- public AbstractOutputChannel<T> getOutputChannel(int pos) {
-
- if (pos < this.outputChannels.size()) {
- return this.outputChannels.get(pos);
- } else {
- return null;
- }
- }
-
-
- @Override
- public NetworkOutputChannel<T> createNetworkOutputChannel(final OutputGate<T> outputGate,
- final ChannelID channelID, final ChannelID connectedChannelID) {
-
- final NetworkOutputChannel<T> enoc = new NetworkOutputChannel<T>(outputGate, this.outputChannels.size(),
- channelID, connectedChannelID);
- addOutputChannel(enoc);
-
- return enoc;
- }
-
-
- @Override
- public InMemoryOutputChannel<T> createInMemoryOutputChannel(final OutputGate<T> outputGate,
- final ChannelID channelID, final ChannelID connectedChannelID) {
-
- final InMemoryOutputChannel<T> einoc = new InMemoryOutputChannel<T>(outputGate, this.outputChannels.size(),
- channelID, connectedChannelID);
- addOutputChannel(einoc);
-
- return einoc;
- }
-
-
- @Override
- public void requestClose() throws IOException, InterruptedException {
- // Close all output channels
- for (int i = 0; i < this.getNumberOfOutputChannels(); i++) {
- final AbstractOutputChannel<T> outputChannel = this.getOutputChannel(i);
- outputChannel.requestClose();
- }
- }
-
-
- @Override
- public boolean isClosed() throws IOException, InterruptedException {
-
- boolean allClosed = true;
-
- for (int i = 0; i < this.getNumberOfOutputChannels(); i++) {
- final AbstractOutputChannel<T> outputChannel = this.getOutputChannel(i);
- if (!outputChannel.isClosed()) {
- allClosed = false;
- }
- }
-
- return allClosed;
- }
-
-
- @Override
- public void writeRecord(final T record) throws IOException, InterruptedException {
-
- if (this.isBroadcast) {
-
- if (getChannelType() == ChannelType.INMEMORY) {
-
- final int numberOfOutputChannels = this.outputChannels.size();
- for (int i = 0; i < numberOfOutputChannels; ++i) {
- this.outputChannels.get(i).writeRecord(record);
- }
-
- } else {
-
- // Use optimization for byte buffered channels
- this.outputChannels.get(0).writeRecord(record);
- }
-
- } else {
-
- // Non-broadcast gate, use channel selector to select output channels
- final int numberOfOutputChannels = this.outputChannels.size();
- final int[] selectedOutputChannels = this.channelSelector.selectChannels(record, numberOfOutputChannels);
-
- if (selectedOutputChannels == null) {
- return;
- }
-
-
- for (int i = 0; i < selectedOutputChannels.length; ++i) {
- if (selectedOutputChannels[i] < numberOfOutputChannels) {
- final AbstractOutputChannel<T> outputChannel = this.outputChannels.get(selectedOutputChannels[i]);
- outputChannel.writeRecord(record);
- }
- }
- }
- }
-
-
- @Override
- public List<AbstractOutputChannel<T>> getOutputChannels() {
- return this.outputChannels;
- }
-
-
- @Override
- public String toString() {
- return "Output " + super.toString();
- }
-
-
- @Override
- public void publishEvent(AbstractEvent event) throws IOException, InterruptedException {
-
- // Copy event to all connected channels
- final Iterator<AbstractOutputChannel<T>> it = this.outputChannels.iterator();
- while (it.hasNext()) {
- it.next().transferEvent(event);
- }
- }
-
-
- @Override
- public void flush() throws IOException, InterruptedException {
- // Flush all connected channels
- final Iterator<AbstractOutputChannel<T>> it = this.outputChannels.iterator();
- while (it.hasNext()) {
- it.next().flush();
- }
- }
-
-
- @Override
- public boolean isBroadcast() {
-
- return this.isBroadcast;
- }
-
-
- @Override
- public ChannelSelector<T> getChannelSelector() {
-
- return this.channelSelector;
- }
-
-
- @Override
- public void releaseAllChannelResources() {
-
- final Iterator<AbstractOutputChannel<T>> it = this.outputChannels.iterator();
-
- while (it.hasNext()) {
- it.next().releaseAllResources();
- }
- }
-
-}
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/352c1b99/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/io/UnionRecordReader.java
----------------------------------------------------------------------
diff --git a/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/io/UnionRecordReader.java b/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/io/UnionRecordReader.java
deleted file mode 100644
index ec8ef0c..0000000
--- a/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/io/UnionRecordReader.java
+++ /dev/null
@@ -1,67 +0,0 @@
-/***********************************************************************************************************************
- * Copyright (C) 2010-2013 by the Stratosphere project (http://stratosphere.eu)
- *
- * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
- * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations under the License.
- **********************************************************************************************************************/
-
-package eu.stratosphere.nephele.io;
-
-import java.io.IOException;
-
-import eu.stratosphere.core.io.IOReadableWritable;
-
-public final class UnionRecordReader<T extends IOReadableWritable> extends AbstractUnionRecordReader<T> implements Reader<T> {
-
- private final Class<T> recordType;
-
- private T lookahead;
-
-
- public UnionRecordReader(MutableRecordReader<T>[] recordReaders, Class<T> recordType) {
- super(recordReaders);
- this.recordType = recordType;
- }
-
- @Override
- public boolean hasNext() throws IOException, InterruptedException {
- if (this.lookahead != null) {
- return true;
- } else {
- T record = instantiateRecordType();
- if (getNextRecord(record)) {
- this.lookahead = record;
- return true;
- } else {
- return false;
- }
- }
- }
-
- @Override
- public T next() throws IOException, InterruptedException {
- if (hasNext()) {
- T tmp = this.lookahead;
- this.lookahead = null;
- return tmp;
- } else {
- return null;
- }
- }
-
- private T instantiateRecordType() {
- try {
- return this.recordType.newInstance();
- } catch (InstantiationException e) {
- throw new RuntimeException("Cannot instantiate class '" + this.recordType.getName() + "'.", e);
- } catch (IllegalAccessException e) {
- throw new RuntimeException("Cannot instantiate class '" + this.recordType.getName() + "'.", e);
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/352c1b99/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/io/Writer.java
----------------------------------------------------------------------
diff --git a/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/io/Writer.java b/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/io/Writer.java
deleted file mode 100644
index 91fa3b7..0000000
--- a/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/io/Writer.java
+++ /dev/null
@@ -1,28 +0,0 @@
-/***********************************************************************************************************************
- * Copyright (C) 2010-2013 by the Stratosphere project (http://stratosphere.eu)
- *
- * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
- * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations under the License.
- **********************************************************************************************************************/
-
-package eu.stratosphere.nephele.io;
-
-import java.io.IOException;
-
-import eu.stratosphere.core.io.IOReadableWritable;
-
-/**
- * A writer that sends records.
- *
- * @param <T> The type of the record that can be emitted with this record writer.
- */
-public interface Writer<T extends IOReadableWritable> {
-
- void emit(T record) throws IOException, InterruptedException;
-}
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/352c1b99/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/io/channels/AbstractChannel.java
----------------------------------------------------------------------
diff --git a/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/io/channels/AbstractChannel.java b/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/io/channels/AbstractChannel.java
deleted file mode 100644
index b48e5f0..0000000
--- a/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/io/channels/AbstractChannel.java
+++ /dev/null
@@ -1,127 +0,0 @@
-/***********************************************************************************************************************
- * Copyright (C) 2010-2013 by the Stratosphere project (http://stratosphere.eu)
- *
- * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
- * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations under the License.
- **********************************************************************************************************************/
-
-package eu.stratosphere.nephele.io.channels;
-
-import java.io.IOException;
-
-import eu.stratosphere.nephele.event.task.AbstractEvent;
-import eu.stratosphere.nephele.jobgraph.JobID;
-
-/**
- * An abstract base class for channel objects.
- */
-public abstract class AbstractChannel {
-
- /**
- * The ID of the channel.
- */
- private final ChannelID channelID;
-
- /**
- * The ID of the connected channel.
- */
- private final ChannelID connectedChannelID;
-
- private final int channelIndex;
-
- /**
- * Auxiliary constructor for channels
- *
- * @param channelIndex
- * the index of the channel in either the output or input gate
- * @param channelID
- * the ID of the channel
- * @param connectedChannelID
- * the ID of the channel this channel is connected to
- * @param compressionLevel
- * the level of compression to be used for this channel
- */
- protected AbstractChannel(final int channelIndex, final ChannelID channelID, final ChannelID connectedChannelID) {
- this.channelIndex = channelIndex;
- this.channelID = channelID;
- this.connectedChannelID = connectedChannelID;
- }
-
- /**
- * Returns the ID of the channel.
- *
- * @return the ID of the channel.
- */
- public ChannelID getID() {
- return this.channelID;
- }
-
- /**
- * Returns the channel's input at the associated gate.
- *
- * @return the channel's input at the associated gate
- */
- public int getChannelIndex() {
- return this.channelIndex;
- }
-
- /**
- * Returns the type of the channel.
- *
- * @return the type of the channel.
- */
- public abstract ChannelType getType();
-
- /**
- * Checks if the channel is closed, i.e. no more records can be transported through the channel.
- *
- * @return <code>true</code> if the channel is closed, <code>false</code> otherwise
- * @throws IOException
- * thrown if an error occurred while closing the channel
- * @throws InterruptedException
- * thrown if the channel is interrupted while waiting for this operation to complete
- */
- public abstract boolean isClosed() throws IOException, InterruptedException;
-
-
- public ChannelID getConnectedChannelID() {
- return this.connectedChannelID;
- }
-
-
- /**
- * Returns the ID of the job this channel belongs to.
- *
- * @return the ID of the job this channel belongs to
- */
- public abstract JobID getJobID();
-
- /**
- * Returns <code>true</code> if this channel is an input channel, <code>false</code> otherwise.
- *
- * @return <code>true</code> if this channel is an input channel, <code>false</code> otherwise
- */
- public abstract boolean isInputChannel();
-
-
- public abstract void transferEvent(AbstractEvent event) throws IOException, InterruptedException;
-
- /**
- * Releases all resources (especially buffers) which are currently allocated by this channel. This method should be
- * called in case of a task error or as a result of a cancel operation.
- */
- public abstract void releaseAllResources();
-
- /**
- * Returns the number of bytes which have been transmitted through this channel since its instantiation.
- *
- * @return the number of bytes which have been transmitted through this channel since its instantiation
- */
- public abstract long getAmountOfDataTransmitted();
-}
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/352c1b99/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/io/channels/AbstractInputChannel.java
----------------------------------------------------------------------
diff --git a/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/io/channels/AbstractInputChannel.java b/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/io/channels/AbstractInputChannel.java
deleted file mode 100644
index 4b88aff..0000000
--- a/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/io/channels/AbstractInputChannel.java
+++ /dev/null
@@ -1,102 +0,0 @@
-/***********************************************************************************************************************
- * Copyright (C) 2010-2013 by the Stratosphere project (http://stratosphere.eu)
- *
- * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
- * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations under the License.
- **********************************************************************************************************************/
-
-package eu.stratosphere.nephele.io.channels;
-
-import java.io.IOException;
-
-import eu.stratosphere.core.io.IOReadableWritable;
-import eu.stratosphere.nephele.event.task.AbstractTaskEvent;
-import eu.stratosphere.nephele.io.InputChannelResult;
-import eu.stratosphere.nephele.io.InputGate;
-import eu.stratosphere.nephele.jobgraph.JobID;
-
-/**
- * InputChannel is an abstract base class to all different kinds of concrete
- * input channels that can be used. Input channels are always parameterized to
- * a specific type that can be transported through the channel.
-
- * @param <T> The Type of the record that can be transported through the channel.
- */
-public abstract class AbstractInputChannel<T extends IOReadableWritable> extends AbstractChannel {
-
- private final InputGate<T> inputGate;
-
- /**
- * Constructs an input channel with a given input gate associated.
- *
- * @param inputGate
- * the input gate this channel is connected to
- * @param channelIndex
- * the index of the channel in the input gate
- * @param channelID
- * the ID of the channel
- * @param connectedChannelID
- * the ID of the channel this channel is connected to
- * @param compressionLevel
- * the level of compression to be used for this channel
- */
- protected AbstractInputChannel(final InputGate<T> inputGate, final int channelIndex, final ChannelID channelID,
- final ChannelID connectedChannelID) {
- super(channelIndex, channelID, connectedChannelID);
- this.inputGate = inputGate;
- }
-
- /**
- * Returns the input gate associated with the input channel.
- *
- * @return the input gate associated with the input channel.
- */
- public InputGate<T> getInputGate() {
- return this.inputGate;
- }
-
- /**
- * Reads a record from the input channel. If currently no record is available the method
- * returns <code>null</code>. If the channel is closed (i.e. no more records will be received), the method
- * throws an {@link EOFException}.
- *
- * @return a record that has been transported through the channel or <code>null</code> if currently no record is
- * available
- * @throws IOException
- * thrown if the input channel is already closed {@link EOFException} or a transmission error has occurred
- */
- public abstract InputChannelResult readRecord(T target) throws IOException;
-
- /**
- * Immediately closes the input channel. The corresponding output channels are
- * notified if necessary. Any remaining records in any buffers or queue is considered
- * irrelevant and is discarded.
- *
- * @throws InterruptedException
- * thrown if the thread is interrupted while waiting for the channel to close
- * @throws IOException
- * thrown if an I/O error occurs while closing the channel
- */
- public abstract void close() throws IOException, InterruptedException;
-
-
-
- @Override
- public boolean isInputChannel() {
- return true;
- }
-
-
- @Override
- public JobID getJobID() {
- return this.inputGate.getJobID();
- }
-
- public abstract AbstractTaskEvent getCurrentEvent();
-}
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/352c1b99/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/io/channels/AbstractOutputChannel.java
----------------------------------------------------------------------
diff --git a/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/io/channels/AbstractOutputChannel.java b/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/io/channels/AbstractOutputChannel.java
deleted file mode 100644
index 7974c24..0000000
--- a/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/io/channels/AbstractOutputChannel.java
+++ /dev/null
@@ -1,111 +0,0 @@
-/***********************************************************************************************************************
- * Copyright (C) 2010-2013 by the Stratosphere project (http://stratosphere.eu)
- *
- * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
- * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations under the License.
- **********************************************************************************************************************/
-
-package eu.stratosphere.nephele.io.channels;
-
-import java.io.IOException;
-
-import eu.stratosphere.core.io.IOReadableWritable;
-import eu.stratosphere.nephele.io.OutputGate;
-import eu.stratosphere.nephele.jobgraph.JobID;
-
-/**
- * OutputChannel is an abstract base class to all different kinds of concrete
- * output channels that can be used. Input channels are always parameterized to
- * a specific type that can be transported through the channel.
- *
- * @param <T>
- * The Type of the record that can be transported through the channel.
- */
-public abstract class AbstractOutputChannel<T extends IOReadableWritable> extends AbstractChannel {
-
- private final OutputGate<T> outputGate;
-
- /**
- * Creates a new output channel object.
- *
- * @param outputGate
- * the output gate this channel is connected to
- * @param channelIndex
- * the index of the channel in the output gate
- * @param channelID
- * the ID of the channel
- * @param connectedChannelID
- * the ID of the channel this channel is connected to
- * @param compressionLevel
- * the level of compression to be used for this channel
- */
- public AbstractOutputChannel(final OutputGate<T> outputGate, final int channelIndex, final ChannelID channelID,
- final ChannelID connectedChannelID) {
- super(channelIndex, channelID, connectedChannelID);
- this.outputGate = outputGate;
- }
-
- /**
- * Returns the output gate this channel is connected to.
- *
- * @return the output gate this channel is connected to
- */
- public OutputGate<T> getOutputGate() {
- return this.outputGate;
- }
-
- /**
- * Writes a record to the channel. The operation may block until the record
- * is completely written to the channel.
- *
- * @param record
- * the record to be written to the channel
- * @throws IOException
- * thrown if an error occurred while transmitting the record
- */
- public abstract void writeRecord(T record) throws IOException, InterruptedException;
-
- /**
- * Requests the output channel to close. After calling this method no more records can be written
- * to the channel. The channel is finally closed when all remaining data that may exist in internal buffers
- * are written to the channel.
- *
- * @throws InterruptedException
- * thrown if the thread is interrupted while requesting the close operation
- * @throws IOException
- * thrown if an I/O error occurs while requesting the close operation
- */
- public abstract void requestClose() throws IOException, InterruptedException;
-
-
-
- @Override
- public boolean isInputChannel() {
- return false;
- }
-
- public abstract void flush() throws IOException, InterruptedException;
-
-
- @Override
- public JobID getJobID() {
- return this.outputGate.getJobID();
- }
-
- /**
- * Returns <code>true</code> if this channel is connected to an output gate which operates in broadcast mode,
- * <code>false</code> otherwise.
- *
- * @return <code>true</code> if the connected output gate operates in broadcase mode, <code>false</code> otherwise
- */
- public boolean isBroadcastChannel() {
-
- return this.outputGate.isBroadcast();
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/352c1b99/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/io/channels/Buffer.java
----------------------------------------------------------------------
diff --git a/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/io/channels/Buffer.java b/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/io/channels/Buffer.java
deleted file mode 100644
index 80dfa8f..0000000
--- a/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/io/channels/Buffer.java
+++ /dev/null
@@ -1,175 +0,0 @@
-/***********************************************************************************************************************
- * Copyright (C) 2010-2013 by the Stratosphere project (http://stratosphere.eu)
- *
- * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
- * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations under the License.
- **********************************************************************************************************************/
-
-package eu.stratosphere.nephele.io.channels;
-
-import java.io.IOException;
-import java.nio.channels.ReadableByteChannel;
-import java.nio.channels.WritableByteChannel;
-import java.util.concurrent.atomic.AtomicBoolean;
-
-
-/**
- * This class represents the general buffer abstraction that is used by Nephele
- * to transport data through the network or the file system.
- * <p>
- * Buffers may be backed by actual main memory or files.
- * <p>
- * Each buffer is expected to be written and read exactly once. Initially, the every buffer is in write mode. Before
- * reading from the buffer, it must be explicitly switched to read mode.
- * <p>
- * This class is in general not thread-safe.
- *
- */
-public abstract class Buffer implements ReadableByteChannel, WritableByteChannel
-{
- /**
- * Stores whether this buffer has already been recycled.
- */
- private final AtomicBoolean isRecycled = new AtomicBoolean(false);
-
- /**
- * Constructs a new buffer object.
- *
- * @param internalBuffer
- * the concrete implementation which backs the buffer
- */
- protected Buffer()
- {}
-
- /**
- * Reads data from the buffer and writes it to the
- * given {@link WritableByteChannel} object.
- *
- * @param destination The {@link WritableByteChannel} object to write the data to
- * @return The number of bytes read from the buffer, potentially <code>0</code> or <code>-1</code to indicate the
- * end of the stream.
- * @throws IOException Thrown if an error occurs while writing to the {@link WritableByteChannel} object.
- */
-
- public abstract boolean isOpen();
-
-
-
- @Override
- public abstract void close() throws IOException;
-
- /**
- * Reads data from the given {@link ReadableByteChannel} object and
- * writes it to the buffer.
- *
- * @param source The {@link ReadableByteChannel} object to read data from.
- * @return The number of bytes written to the buffer, possibly <code>0</code>.
- * @throws IOException Thrown if an error occurs while writing data to the buffer.
- */
- public abstract int write(ReadableByteChannel source) throws IOException;
-
-
- /**
- * Returns the number of bytes which can be either still written to or read from
- * the buffer, depending whether the buffer is still in write mode or not.
- * <p>
- * If in write mode, the method returns the number of bytes which can be written to be buffer, before its capacity
- * limit is reached. In read mode, the method returns the number of bytes which can be read from the number until
- * all data previously written to the buffer is consumed.
- *
- * @return the number of bytes which can be either written to or read from the buffer
- */
- public abstract int remaining();
-
- /**
- * Checks whether data can still be written to or read from the buffer.
- *
- * @return <code>true</code> if data can be still written to or read from
- * the buffer, <code>false</code> otherwise
- */
- public boolean hasRemaining() {
- return remaining() > 0;
- }
-
- /**
- * Returns the size of the buffer. In write mode, the size of the buffer is the initial capacity
- * of the buffer. In read mode, the size of the buffer is number of bytes which have been
- * previously written to the buffer.
- *
- * @return the size of the buffer in bytes
- */
- public abstract int size();
-
- /**
- * Recycles the buffer. In case of a memory backed buffer, the internal memory buffer
- * is returned to a global buffer queue. In case of a file backed buffer, the temporary
- * file created for this buffer is deleted. A buffer can only be recycled once. Calling this method more than once
- * will therefore have no effect.
- */
- public final void recycleBuffer()
- {
- if (this.isRecycled.compareAndSet(false, true)) {
- recycle();
- }
- }
-
- protected abstract void recycle();
-
-
- /**
- * Returns whether the buffer is backed by main memory or a file.
- *
- * @return <code>true</code> if the buffer is backed by main memory
- * or <code>false</code> if it is backed by a file
- */
- public abstract boolean isBackedByMemory();
-
- /**
- * Copies the content of the buffer to the given destination buffer. The state of the source buffer is not modified
- * by this operation.
- *
- * @param destinationBuffer
- * the destination buffer to copy this buffer's content to
- * @throws IOException
- * thrown if an error occurs while copying the data
- */
- public abstract void copyToBuffer(Buffer destinationBuffer) throws IOException;
-
- /**
- * Duplicates the buffer. This operation does not duplicate the actual
- * content of the buffer, only the reading/writing state. As a result,
- * modifications to the original buffer will affect the duplicate and vice-versa.
- *
- * @return the duplicated buffer
- */
- public abstract Buffer duplicate() throws IOException, InterruptedException;
-
- /**
- * Reads data from the buffer and writes it to the
- * given {@link WritableByteChannel} object.
- *
- * @param destination The {@link WritableByteChannel} object to write the data to
- * @return The number of bytes read from the buffer, potentially <code>0</code> or <code>-1</code to indicate the
- * end of the stream.
- * @throws IOException Thrown if an error occurs while writing to the {@link WritableByteChannel} object.
- */
- public abstract int writeTo(WritableByteChannel writableByteChannel) throws IOException;
-
- /**
- * Flip buffer (exchange limit and position).
- */
- public abstract void flip();
-
- /**
- * Returns the current read/write position for relative operations.
- * @return
- */
- public abstract int position();
-
-}
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/352c1b99/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/io/channels/BufferFactory.java
----------------------------------------------------------------------
diff --git a/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/io/channels/BufferFactory.java b/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/io/channels/BufferFactory.java
deleted file mode 100644
index 001e5a7..0000000
--- a/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/io/channels/BufferFactory.java
+++ /dev/null
@@ -1,32 +0,0 @@
-/***********************************************************************************************************************
- * Copyright (C) 2010-2013 by the Stratosphere project (http://stratosphere.eu)
- *
- * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
- * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations under the License.
- **********************************************************************************************************************/
-
-package eu.stratosphere.nephele.io.channels;
-
-import eu.stratosphere.core.memory.MemorySegment;
-
-
-public final class BufferFactory {
-
- public static MemoryBuffer createFromMemory(final int bufferSize, final MemorySegment byteBuffer,
- final MemoryBufferPoolConnector bufferPoolConnector) {
-
- return new MemoryBuffer(bufferSize, byteBuffer, bufferPoolConnector);
- }
-
- /**
- * Private constructor to prevent instantiation.
- */
- private BufferFactory() {
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/352c1b99/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/io/channels/ChannelID.java
----------------------------------------------------------------------
diff --git a/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/io/channels/ChannelID.java b/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/io/channels/ChannelID.java
deleted file mode 100644
index a4e3ba2..0000000
--- a/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/io/channels/ChannelID.java
+++ /dev/null
@@ -1,30 +0,0 @@
-/***********************************************************************************************************************
- * Copyright (C) 2010-2013 by the Stratosphere project (http://stratosphere.eu)
- *
- * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
- * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations under the License.
- **********************************************************************************************************************/
-
-package eu.stratosphere.nephele.io.channels;
-
-import eu.stratosphere.nephele.io.AbstractID;
-
-/**
- * A class for statistically unique channel IDs.
- *
- */
-public class ChannelID extends AbstractID {
-
- /**
- * Constructs a new, random channel ID.
- */
- public ChannelID() {
- super();
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/352c1b99/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/io/channels/ChannelType.java
----------------------------------------------------------------------
diff --git a/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/io/channels/ChannelType.java b/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/io/channels/ChannelType.java
deleted file mode 100644
index 17fc980..0000000
--- a/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/io/channels/ChannelType.java
+++ /dev/null
@@ -1,31 +0,0 @@
-/***********************************************************************************************************************
- * Copyright (C) 2010-2013 by the Stratosphere project (http://stratosphere.eu)
- *
- * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
- * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations under the License.
- **********************************************************************************************************************/
-
-package eu.stratosphere.nephele.io.channels;
-
-/**
- * An enumeration for declaring the type of channel.
- *
- */
-public enum ChannelType {
-
- /**
- * Enumeration type for network channels.
- */
- NETWORK,
-
- /**
- * Enumeration type for in-memory channels.
- */
- INMEMORY
-}
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/352c1b99/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/io/channels/ChannelWithAccessInfo.java
----------------------------------------------------------------------
diff --git a/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/io/channels/ChannelWithAccessInfo.java b/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/io/channels/ChannelWithAccessInfo.java
deleted file mode 100644
index a44b1bd..0000000
--- a/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/io/channels/ChannelWithAccessInfo.java
+++ /dev/null
@@ -1,58 +0,0 @@
-/***********************************************************************************************************************
- * Copyright (C) 2010-2013 by the Stratosphere project (http://stratosphere.eu)
- *
- * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
- * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations under the License.
- **********************************************************************************************************************/
-
-package eu.stratosphere.nephele.io.channels;
-
-import java.nio.channels.FileChannel;
-
-interface ChannelWithAccessInfo {
-
- FileChannel getChannel();
-
- FileChannel getAndIncrementReferences();
-
- /**
- * Increments the references to this channel. Returns <code>true</code>, if successful, and <code>false</code>,
- * if the channel has been disposed in the meantime.
- *
- * @return True, if successful, false, if the channel has been disposed.
- */
- boolean incrementReferences();
-
- ChannelWithPosition reserveWriteSpaceAndIncrementReferences(int spaceToReserve);
-
- /**
- * Decrements the number of references to this channel. If the number of references is zero after the
- * decrement, the channel is deleted.
- *
- * @return The number of references remaining after the decrement.
- * @throws IllegalStateException
- * Thrown, if the number of references is already zero or below.
- */
- int decrementReferences();
-
- /**
- * Disposes the channel without further notice. Tries to close it (swallowing all exceptions) and tries
- * to delete the file.
- */
- void disposeSilently();
-
- /**
- * Updates the flag which indicates whether the underlying physical file shall be deleted when it is closed. Once
- * the flag was updated to <code>false</code> it cannot be set to <code>true</code> again.
- *
- * @param deleteOnClose
- * <code>true</code> to indicate the file shall be deleted when closed, <code>false</code> otherwise
- */
- void updateDeleteOnCloseFlag(final boolean deleteOnClose);
-}
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/352c1b99/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/io/channels/ChannelWithPosition.java
----------------------------------------------------------------------
diff --git a/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/io/channels/ChannelWithPosition.java b/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/io/channels/ChannelWithPosition.java
deleted file mode 100644
index 5678439..0000000
--- a/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/io/channels/ChannelWithPosition.java
+++ /dev/null
@@ -1,42 +0,0 @@
-/***********************************************************************************************************************
- * Copyright (C) 2010-2013 by the Stratosphere project (http://stratosphere.eu)
- *
- * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
- * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations under the License.
- **********************************************************************************************************************/
-
-package eu.stratosphere.nephele.io.channels;
-
-import java.nio.channels.FileChannel;
-
-/**
- * A simple encapsulation of a file channel with an offset. This object is used for purposes, where
- * the channel is accessed by multiple threads and its internal position may be changed.
- */
-public class ChannelWithPosition {
-
- private final FileChannel channel;
-
- private final long offset;
-
- ChannelWithPosition(final FileChannel channel, final long offset) {
- this.channel = channel;
- this.offset = offset;
- }
-
- public FileChannel getChannel() {
-
- return this.channel;
- }
-
- public long getOffset() {
-
- return this.offset;
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/352c1b99/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/io/channels/DefaultDeserializer.java
----------------------------------------------------------------------
diff --git a/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/io/channels/DefaultDeserializer.java b/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/io/channels/DefaultDeserializer.java
deleted file mode 100644
index 3f47fe1..0000000
--- a/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/io/channels/DefaultDeserializer.java
+++ /dev/null
@@ -1,781 +0,0 @@
-/***********************************************************************************************************************
- * Copyright (C) 2010-2013 by the Stratosphere project (http://stratosphere.eu)
- *
- * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
- * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations under the License.
- **********************************************************************************************************************/
-
-package eu.stratosphere.nephele.io.channels;
-
-import java.io.EOFException;
-import java.io.IOException;
-import java.io.UTFDataFormatException;
-import java.nio.BufferUnderflowException;
-import java.nio.ByteBuffer;
-import java.nio.ByteOrder;
-import java.nio.channels.ReadableByteChannel;
-
-import eu.stratosphere.core.io.IOReadableWritable;
-import eu.stratosphere.core.memory.DataInputView;
-import eu.stratosphere.nephele.io.RecordDeserializer;
-
-/**
- * A class for deserializing a portion of binary data into records of type <code>T</code>. The internal
- * buffer grows dynamically to the size that is required for deserialization.
- *
- * @param <T>
- * The type of the record this deserialization buffer can be used for.
- */
-public class DefaultDeserializer<T extends IOReadableWritable> implements RecordDeserializer<T> {
- /**
- * The size of an integer in byte.
- */
- private static final int SIZEOFINT = 4;
-
- // --------------------------------------------------------------------------------------------
-
- /**
- * The data input buffer used for deserialization.
- */
- private final DataInputWrapper deserializationWrapper;
-
- /**
- * Buffer to reconstruct the length field.
- */
- private final ByteBuffer lengthBuf;
-
- /**
- * Temporary buffer.
- */
- private ByteBuffer tempBuffer;
-
- /**
- * The type of the record to be deserialized.
- */
- private final Class<? extends T> recordType;
-
- /**
- * Size of the record to be deserialized in bytes.
- */
- private int recordLength = -1;
-
- /**
- * Flag indicating whether to throw an exception if nothing can be read any more.
- */
- private final boolean propagateEndOfStream;
-
- // --------------------------------------------------------------------------------------------
-
- /**
- * Constructs a new deserialization buffer with the specified type.
- *
- * @param recordType
- * The type of the record to be deserialized.
- */
- public DefaultDeserializer(final Class<? extends T> recordType) {
- this(recordType, false);
- }
-
- /**
- * Constructs a new deserialization buffer with the specified type.
- *
- * @param recordType
- * The type of the record to be deserialized.
- * @param propagateEndOfStream
- * <code>True</code>, if end of stream notifications during the
- * deserialization process shall be propagated to the caller, <code>false</code> otherwise.
- */
- public DefaultDeserializer(final Class<? extends T> recordType, final boolean propagateEndOfStream) {
- this.recordType = recordType;
- this.propagateEndOfStream = propagateEndOfStream;
-
- this.lengthBuf = ByteBuffer.allocate(SIZEOFINT);
- this.lengthBuf.order(ByteOrder.BIG_ENDIAN);
-
- this.tempBuffer = ByteBuffer.allocate(128);
- this.tempBuffer.order(ByteOrder.BIG_ENDIAN);
-
- this.deserializationWrapper = new DataInputWrapper();
- this.deserializationWrapper.setArray(this.tempBuffer.array());
- }
-
- // --------------------------------------------------------------------------------------------
-
- /*
- * (non-Javadoc)
- * @see eu.stratosphere.nephele.io.RecordDeserializer#readData(java.lang.Object,
- * java.nio.channels.ReadableByteChannel)
- */
- @Override
- public T readData(T target, final ReadableByteChannel readableByteChannel) throws IOException {
- // check whether the length has already been de-serialized
- final int len;
- if (this.recordLength < 0) {
- if (readableByteChannel.read(this.lengthBuf) == -1 && this.propagateEndOfStream) {
- if (this.lengthBuf.position() == 0) {
- throw new EOFException();
- } else {
- throw new IOException("Deserialization error: Expected to read " + this.lengthBuf.remaining()
- + " more bytes of length information from the stream!");
- }
- }
-
- if (this.lengthBuf.hasRemaining()) {
- return null;
- }
-
- len = this.lengthBuf.getInt(0);
- this.lengthBuf.clear();
-
- if (this.tempBuffer.capacity() < len) {
- this.tempBuffer = ByteBuffer.allocate(len);
- this.tempBuffer.order(ByteOrder.BIG_ENDIAN);
- this.deserializationWrapper.setArray(this.tempBuffer.array());
- }
-
- // Important: limit the number of bytes that can be read into the buffer
- this.tempBuffer.position(0);
- this.tempBuffer.limit(len);
- } else {
- len = this.recordLength;
- }
-
- if (readableByteChannel.read(this.tempBuffer) == -1 && this.propagateEndOfStream) {
- throw new IOException("Deserilization error: Expected to read " + this.tempBuffer.remaining()
- + " more bytes from stream!");
- }
-
- if (this.tempBuffer.hasRemaining()) {
- this.recordLength = len;
- return null;
- } else {
- this.recordLength = -1;
- }
-
- this.deserializationWrapper.reset(len);
-
- if (target == null) {
- target = instantiateTarget();
- }
-
- // now de-serialize the target
- try {
- target.read(this.deserializationWrapper);
- return target;
- } catch (BufferUnderflowException buex) {
- throw new EOFException();
- }
- }
-
- private final T instantiateTarget() throws IOException {
- try {
- return this.recordType.newInstance();
- } catch (Exception e) {
- throw new IOException("Could not instantiate the given record type: " + e.getMessage(), e);
- }
- }
-
- /*
- * (non-Javadoc)
- * @see eu.stratosphere.nephele.io.RecordDeserializer#clear()
- */
- @Override
- public void clear() {
-
- this.recordLength = -1;
- if (this.tempBuffer != null) {
- this.tempBuffer.clear();
- }
- if (this.lengthBuf != null) {
- this.lengthBuf.clear();
- }
- }
-
- /*
- * (non-Javadoc)
- * @see eu.stratosphere.nephele.io.RecordDeserializer#hasUnfinishedData()
- */
- @Override
- public boolean hasUnfinishedData() {
- if (this.recordLength != -1) {
- return true;
- }
-
- if (this.lengthBuf.position() > 0) {
- return true;
- }
-
- return false;
- }
-
- // --------------------------------------------------------------------------------------------
-
- // private static final class DataInputWrapper implements DataInputView
- // {
- // private ByteBuffer source;
- //
- // private byte[] utfByteBuffer; // reusable byte buffer for utf-8 decoding
- // private char[] utfCharBuffer; // reusable char buffer for utf-8 decoding
- //
- //
- // void set(ByteBuffer source) {
- // this.source = source;
- // }
- //
- //
- // /* (non-Javadoc)
- // * @see java.io.DataInput#readFully(byte[])
- // */
- // @Override
- // public void readFully(byte[] b) {
- // this.source.get(b);
- // }
- //
- // /* (non-Javadoc)
- // * @see java.io.DataInput#readFully(byte[], int, int)
- // */
- // @Override
- // public void readFully(byte[] b, int off, int len) {
- // this.source.get(b, off, len);
- // }
- //
- // /* (non-Javadoc)
- // * @see java.io.DataInput#skipBytes(int)
- // */
- // @Override
- // public int skipBytes(int n) {
- // int newPos = this.source.position() + n;
- // if (newPos > this.source.limit()) {
- // newPos = this.source.limit();
- // n = newPos - this.source.position();
- // }
- // this.source.position(newPos);
- // return n;
- // }
- //
- // /* (non-Javadoc)
- // * @see java.io.DataInput#readBoolean()
- // */
- // @Override
- // public boolean readBoolean() {
- // return this.source.get() != 0;
- // }
- //
- // /* (non-Javadoc)
- // * @see java.io.DataInput#readByte()
- // */
- // @Override
- // public byte readByte() {
- // return this.source.get();
- // }
- //
- // /* (non-Javadoc)
- // * @see java.io.DataInput#readUnsignedByte()
- // */
- // @Override
- // public int readUnsignedByte() {
- // return this.source.get() & 0xff;
- // }
- //
- // /* (non-Javadoc)
- // * @see java.io.DataInput#readShort()
- // */
- // @Override
- // public short readShort() {
- // return this.source.getShort();
- // }
- //
- // /* (non-Javadoc)
- // * @see java.io.DataInput#readUnsignedShort()
- // */
- // @Override
- // public int readUnsignedShort() {
- // return this.source.getShort() & 0xffff;
- // }
- //
- // /* (non-Javadoc)
- // * @see java.io.DataInput#readChar()
- // */
- // @Override
- // public char readChar() {
- // return this.source.getChar();
- // }
- //
- // /* (non-Javadoc)
- // * @see java.io.DataInput#readInt()
- // */
- // @Override
- // public int readInt() {
- // return this.source.getInt();
- // }
- //
- // /* (non-Javadoc)
- // * @see java.io.DataInput#readLong()
- // */
- // @Override
- // public long readLong() {
- // return this.source.getLong();
- // }
- //
- // /* (non-Javadoc)
- // * @see java.io.DataInput#readFloat()
- // */
- // @Override
- // public float readFloat() {
- // return Float.intBitsToFloat(this.source.getInt());
- // }
- //
- // /* (non-Javadoc)
- // * @see java.io.DataInput#readDouble()
- // */
- // @Override
- // public double readDouble() {
- // return Double.longBitsToDouble(this.source.getLong());
- // }
- //
- // /* (non-Javadoc)
- // * @see java.io.DataInput#readLine()
- // */
- // @Override
- // public String readLine()
- // {
- // if (this.source.hasRemaining()) {
- // // read until a newline is found
- // StringBuilder bld = new StringBuilder();
- // char curr;
- // while (this.source.hasRemaining() && (curr = (char) readUnsignedByte()) != '\n') {
- // bld.append(curr);
- // }
- // // trim a trailing carriage return
- // int len = bld.length();
- // if (len > 0 && bld.charAt(len - 1) == '\r') {
- // bld.setLength(len - 1);
- // }
- // String s = bld.toString();
- // bld.setLength(0);
- // return s;
- // } else {
- // return null;
- // }
- // }
- //
- // /* (non-Javadoc)
- // * @see java.io.DataInput#readUTF()
- // */
- // @Override
- // public String readUTF() throws IOException
- // {
- // final int utflen = readUnsignedShort();
- //
- // final byte[] bytearr;
- // final char[] chararr;
- //
- // if (this.utfByteBuffer == null || this.utfByteBuffer.length < utflen) {
- // bytearr = new byte[utflen];
- // this.utfByteBuffer = bytearr;
- // } else {
- // bytearr = this.utfByteBuffer;
- // }
- // if (this.utfCharBuffer == null || this.utfCharBuffer.length < utflen) {
- // chararr = new char[utflen];
- // this.utfCharBuffer = chararr;
- // } else {
- // chararr = this.utfCharBuffer;
- // }
- //
- // int c, char2, char3;
- // int count = 0;
- // int chararr_count = 0;
- //
- // readFully(bytearr, 0, utflen);
- //
- // while (count < utflen) {
- // c = (int) bytearr[count] & 0xff;
- // if (c > 127)
- // break;
- // count++;
- // chararr[chararr_count++] = (char) c;
- // }
- //
- // while (count < utflen) {
- // c = (int) bytearr[count] & 0xff;
- // switch (c >> 4) {
- // case 0:
- // case 1:
- // case 2:
- // case 3:
- // case 4:
- // case 5:
- // case 6:
- // case 7:
- // /* 0xxxxxxx */
- // count++;
- // chararr[chararr_count++] = (char) c;
- // break;
- // case 12:
- // case 13:
- // /* 110x xxxx 10xx xxxx */
- // count += 2;
- // if (count > utflen)
- // throw new UTFDataFormatException("malformed input: partial character at end");
- // char2 = (int) bytearr[count - 1];
- // if ((char2 & 0xC0) != 0x80)
- // throw new UTFDataFormatException("malformed input around byte " + count);
- // chararr[chararr_count++] = (char) (((c & 0x1F) << 6) | (char2 & 0x3F));
- // break;
- // case 14:
- // /* 1110 xxxx 10xx xxxx 10xx xxxx */
- // count += 3;
- // if (count > utflen)
- // throw new UTFDataFormatException("malformed input: partial character at end");
- // char2 = (int) bytearr[count - 2];
- // char3 = (int) bytearr[count - 1];
- // if (((char2 & 0xC0) != 0x80) || ((char3 & 0xC0) != 0x80))
- // throw new UTFDataFormatException("malformed input around byte " + (count - 1));
- // chararr[chararr_count++] = (char) (((c & 0x0F) << 12) | ((char2 & 0x3F) << 6) | ((char3 & 0x3F) << 0));
- // break;
- // default:
- // /* 10xx xxxx, 1111 xxxx */
- // throw new UTFDataFormatException("malformed input around byte " + count);
- // }
- // }
- // // The number of chars produced may be less than utflen
- // return new String(chararr, 0, chararr_count);
- // }
- //
- //
- // /* (non-Javadoc)
- // * @see eu.stratosphere.nephele.services.memorymanager.DataInputView#skipBytesToRead(int)
- // */
- // @Override
- // public void skipBytesToRead(int numBytes) throws IOException {
- // if (this.source.remaining() < numBytes) {
- // throw new EOFException();
- // } else {
- // this.source.position(this.source.position() + numBytes);
- // }
- // }
- // }
-
- private static final class DataInputWrapper implements DataInputView {
- private byte[] source;
-
- private int position;
-
- private int limit;
-
- private char[] utfCharBuffer; // reusable char buffer for utf-8 decoding
-
- void setArray(byte[] source) {
- this.source = source;
- }
-
- void reset(int limit) {
- this.position = 0;
- this.limit = limit;
- }
-
- /*
- * (non-Javadoc)
- * @see java.io.DataInput#readFully(byte[])
- */
- @Override
- public void readFully(byte[] b) throws EOFException {
- readFully(b, 0, b.length);
- }
-
- /*
- * (non-Javadoc)
- * @see java.io.DataInput#readFully(byte[], int, int)
- */
- @Override
- public void readFully(byte[] b, int off, int len) throws EOFException {
- if (this.position <= this.limit - len) {
- System.arraycopy(this.source, this.position, b, off, len);
- this.position += len;
- } else {
- throw new EOFException();
- }
- }
-
- /*
- * (non-Javadoc)
- * @see java.io.DataInput#skipBytes(int)
- */
- @Override
- public int skipBytes(int n) {
- if (n < 0) {
- throw new IllegalArgumentException("Number of bytes to skip must not be negative.");
- }
-
- int toSkip = Math.min(this.limit - this.position, n);
- this.position += toSkip;
- return toSkip;
- }
-
- /*
- * (non-Javadoc)
- * @see java.io.DataInput#readBoolean()
- */
- @Override
- public boolean readBoolean() throws EOFException {
- return readByte() != 0;
- }
-
- /*
- * (non-Javadoc)
- * @see java.io.DataInput#readByte()
- */
- @Override
- public byte readByte() throws EOFException {
- if (this.position < this.limit) {
- return this.source[this.position++];
- } else {
- throw new EOFException();
- }
- }
-
- /*
- * (non-Javadoc)
- * @see java.io.DataInput#readUnsignedByte()
- */
- @Override
- public int readUnsignedByte() throws EOFException {
- return readByte() & 0xff;
- }
-
- /*
- * (non-Javadoc)
- * @see java.io.DataInput#readShort()
- */
- @Override
- public short readShort() throws EOFException {
- if (this.position < this.limit - 1) {
- short num = (short) (
- ((this.source[this.position + 0] & 0xff) << 8) |
- ((this.source[this.position + 1] & 0xff)));
- this.position += 2;
- return num;
- } else {
- throw new EOFException();
- }
- }
-
- /*
- * (non-Javadoc)
- * @see java.io.DataInput#readUnsignedShort()
- */
- @Override
- public int readUnsignedShort() throws EOFException {
- return readShort() & 0xffff;
- }
-
- /*
- * (non-Javadoc)
- * @see java.io.DataInput#readChar()
- */
- @Override
- public char readChar() throws EOFException {
- if (this.position < this.limit - 1) {
- char c = (char) (
- ((this.source[this.position + 0] & 0xff) << 8) |
- ((this.source[this.position + 1] & 0xff)));
- this.position += 2;
- return c;
- } else {
- throw new EOFException();
- }
- }
-
- /*
- * (non-Javadoc)
- * @see java.io.DataInput#readInt()
- */
- @Override
- public int readInt() throws EOFException {
- if (this.position < this.limit - 3) {
- final int num = ((this.source[this.position + 0] & 0xff) << 24) |
- ((this.source[this.position + 1] & 0xff) << 16) |
- ((this.source[this.position + 2] & 0xff) << 8) |
- ((this.source[this.position + 3] & 0xff));
- this.position += 4;
- return num;
- } else {
- throw new EOFException();
- }
- }
-
- /*
- * (non-Javadoc)
- * @see java.io.DataInput#readLong()
- */
- @Override
- public long readLong() throws EOFException {
- if (this.position < this.limit - 7) {
- final long num = (((long) this.source[this.position + 0] & 0xff) << 56) |
- (((long) this.source[this.position + 1] & 0xff) << 48) |
- (((long) this.source[this.position + 2] & 0xff) << 40) |
- (((long) this.source[this.position + 3] & 0xff) << 32) |
- (((long) this.source[this.position + 4] & 0xff) << 24) |
- (((long) this.source[this.position + 5] & 0xff) << 16) |
- (((long) this.source[this.position + 6] & 0xff) << 8) |
- (((long) this.source[this.position + 7] & 0xff) << 0);
- this.position += 8;
- return num;
- } else {
- throw new EOFException();
- }
- }
-
- /*
- * (non-Javadoc)
- * @see java.io.DataInput#readFloat()
- */
- @Override
- public float readFloat() throws EOFException {
- return Float.intBitsToFloat(readInt());
- }
-
- /*
- * (non-Javadoc)
- * @see java.io.DataInput#readDouble()
- */
- @Override
- public double readDouble() throws EOFException {
- return Double.longBitsToDouble(readLong());
- }
-
- /*
- * (non-Javadoc)
- * @see java.io.DataInput#readLine()
- */
- @Override
- public String readLine() {
- if (this.position < this.limit) {
- // read until a newline is found
- StringBuilder bld = new StringBuilder();
- char curr;
- while (this.position < this.limit && (curr = (char) (this.source[this.position++] & 0xff)) != '\n') {
- bld.append(curr);
- }
- // trim a trailing carriage return
- int len = bld.length();
- if (len > 0 && bld.charAt(len - 1) == '\r') {
- bld.setLength(len - 1);
- }
- String s = bld.toString();
- bld.setLength(0);
- return s;
- } else {
- return null;
- }
- }
-
- /*
- * (non-Javadoc)
- * @see java.io.DataInput#readUTF()
- */
- @Override
- public String readUTF() throws IOException {
- final int utflen = readUnsignedShort();
- final int utfLimit = this.position + utflen;
-
- if (utfLimit > this.limit) {
- throw new EOFException();
- }
-
- final byte[] bytearr = this.source;
- final char[] chararr;
- if (this.utfCharBuffer == null || this.utfCharBuffer.length < utflen) {
- chararr = new char[utflen];
- this.utfCharBuffer = chararr;
- } else {
- chararr = this.utfCharBuffer;
- }
-
- int c, char2, char3;
- int count = this.position;
- int chararr_count = 0;
-
- while (count < utfLimit) {
- c = (int) bytearr[count] & 0xff;
- if (c > 127) {
- break;
- }
- count++;
- chararr[chararr_count++] = (char) c;
- }
-
- while (count < utfLimit) {
- c = (int) bytearr[count] & 0xff;
- switch (c >> 4) {
- case 0:
- case 1:
- case 2:
- case 3:
- case 4:
- case 5:
- case 6:
- case 7:
- /* 0xxxxxxx */
- count++;
- chararr[chararr_count++] = (char) c;
- break;
- case 12:
- case 13:
- /* 110x xxxx 10xx xxxx */
- count += 2;
- if (count > utfLimit) {
- throw new UTFDataFormatException("Malformed input: partial character at end");
- }
- char2 = (int) bytearr[count - 1];
- if ((char2 & 0xC0) != 0x80) {
- throw new UTFDataFormatException("Malformed input around byte " + count);
- }
- chararr[chararr_count++] = (char) (((c & 0x1F) << 6) | (char2 & 0x3F));
- break;
- case 14:
- /* 1110 xxxx 10xx xxxx 10xx xxxx */
- count += 3;
- if (count > utfLimit) {
- throw new UTFDataFormatException("Malformed input: partial character at end");
- }
- char2 = (int) bytearr[count - 2];
- char3 = (int) bytearr[count - 1];
- if (((char2 & 0xC0) != 0x80) || ((char3 & 0xC0) != 0x80)) {
- throw new UTFDataFormatException("Malformed input around byte " + (count - 1));
- }
- chararr[chararr_count++] = (char) (((c & 0x0F) << 12) | ((char2 & 0x3F) << 6) | ((char3 & 0x3F) << 0));
- break;
- default:
- /* 10xx xxxx, 1111 xxxx */
- throw new UTFDataFormatException("Malformed input around byte " + count);
- }
- }
- // The number of chars produced may be less than utflen
- this.position += utflen;
- return new String(chararr, 0, chararr_count);
- }
-
- /*
- * (non-Javadoc)
- * @see eu.stratosphere.nephele.services.memorymanager.DataInputView#skipBytesToRead(int)
- */
- @Override
- public void skipBytesToRead(int numBytes) throws EOFException {
- if (numBytes < 0) {
- throw new IllegalArgumentException("Number of bytes to skip must not be negative.");
- } else if (this.limit - this.position < numBytes) {
- throw new EOFException();
- } else {
- this.position += numBytes;
- }
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/352c1b99/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/io/channels/DistributedChannelWithAccessInfo.java
----------------------------------------------------------------------
diff --git a/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/io/channels/DistributedChannelWithAccessInfo.java b/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/io/channels/DistributedChannelWithAccessInfo.java
deleted file mode 100644
index 6873c44..0000000
--- a/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/io/channels/DistributedChannelWithAccessInfo.java
+++ /dev/null
@@ -1,176 +0,0 @@
-/***********************************************************************************************************************
- * Copyright (C) 2010-2013 by the Stratosphere project (http://stratosphere.eu)
- *
- * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
- * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations under the License.
- **********************************************************************************************************************/
-
-package eu.stratosphere.nephele.io.channels;
-
-import java.io.IOException;
-import java.nio.channels.FileChannel;
-import java.util.concurrent.atomic.AtomicBoolean;
-import java.util.concurrent.atomic.AtomicInteger;
-import java.util.concurrent.atomic.AtomicLong;
-
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-
-import eu.stratosphere.core.fs.FileChannelWrapper;
-import eu.stratosphere.core.fs.FileSystem;
-import eu.stratosphere.core.fs.Path;
-
-final class DistributedChannelWithAccessInfo implements ChannelWithAccessInfo {
-
- /**
- * The logging object.
- */
- private static final Log LOG = LogFactory.getLog(DistributedChannelWithAccessInfo.class);
-
- private final FileSystem fs;
-
- private final Path checkpointFile;
-
- private final FileChannelWrapper channel;
-
- private final AtomicLong reservedWritePosition;
-
- private final AtomicInteger referenceCounter;
-
- private final AtomicBoolean deleteOnClose;
-
- DistributedChannelWithAccessInfo(final FileSystem fs, final Path checkpointFile, final int bufferSize,
- final boolean deleteOnClose) throws IOException {
-
- this.fs = fs;
- this.checkpointFile = checkpointFile;
- this.channel = new FileChannelWrapper(fs, checkpointFile, bufferSize, (short) 2);
- this.reservedWritePosition = new AtomicLong(0L);
- this.referenceCounter = new AtomicInteger(0);
- this.deleteOnClose = new AtomicBoolean(deleteOnClose);
- }
-
-
- @Override
- public FileChannel getChannel() {
-
- return this.channel;
- }
-
-
- @Override
- public FileChannel getAndIncrementReferences() {
-
- if (incrementReferences()) {
- return this.channel;
- } else {
- return null;
- }
- }
-
- @Override
- public ChannelWithPosition reserveWriteSpaceAndIncrementReferences(final int spaceToReserve) {
-
- if (incrementReferences()) {
- return new ChannelWithPosition(this.channel, this.reservedWritePosition.getAndAdd(spaceToReserve));
- } else {
- return null;
- }
- }
-
-
- @Override
- public int decrementReferences() {
-
- int current = this.referenceCounter.get();
- while (true) {
- if (current <= 0) {
- // this is actually an error case, because the channel was deleted before
- throw new IllegalStateException("The references to the file were already at zero.");
- }
-
- if (current == 1) {
- // this call decrements to zero, so mark it as deleted
- if (this.referenceCounter.compareAndSet(current, Integer.MIN_VALUE)) {
- current = 0;
- break;
- }
- } else if (this.referenceCounter.compareAndSet(current, current - 1)) {
- current = current - 1;
- break;
- }
- current = this.referenceCounter.get();
- }
-
- if (current > 0) {
- return current;
- } else if (current == 0) {
- // delete the channel
- this.referenceCounter.set(Integer.MIN_VALUE);
- this.reservedWritePosition.set(Long.MIN_VALUE);
- try {
- this.channel.close();
- if (this.deleteOnClose.get()) {
- this.fs.delete(this.checkpointFile, false);
- }
-
- } catch (IOException ioex) {
- if (LOG.isErrorEnabled()) {
- LOG.error("Error while closing spill file for file buffers: " + ioex.getMessage(), ioex);
- }
- }
- return current;
- } else {
- throw new IllegalStateException("The references to the file were already at zero.");
- }
- }
-
-
- @Override
- public boolean incrementReferences() {
-
- int current = this.referenceCounter.get();
- while (true) {
- // check whether it was disposed in the meantime
- if (current < 0) {
- return false;
- }
- // atomically check and increment
- if (this.referenceCounter.compareAndSet(current, current + 1)) {
- return true;
- }
- current = this.referenceCounter.get();
- }
- }
-
-
- @Override
- public void disposeSilently() {
-
- this.referenceCounter.set(Integer.MIN_VALUE);
- this.reservedWritePosition.set(Long.MIN_VALUE);
-
- if (this.channel.isOpen()) {
- try {
- this.channel.close();
- if (this.deleteOnClose.get()) {
- this.fs.delete(this.checkpointFile, false);
- }
- } catch (Throwable t) {
- }
- }
- }
-
-
- @Override
- public void updateDeleteOnCloseFlag(final boolean deleteOnClose) {
-
- this.deleteOnClose.compareAndSet(true, deleteOnClose);
- }
-}