You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by rm...@apache.org on 2014/06/09 20:30:45 UTC
[10/30] Offer buffer-oriented API for I/O (#25)
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/2db78a8d/stratosphere-runtime/src/test/java/eu/stratosphere/nephele/jobmanager/JobManagerITCase.java
----------------------------------------------------------------------
diff --git a/stratosphere-runtime/src/test/java/eu/stratosphere/nephele/jobmanager/JobManagerITCase.java b/stratosphere-runtime/src/test/java/eu/stratosphere/nephele/jobmanager/JobManagerITCase.java
index 965a5aa..06c857e 100644
--- a/stratosphere-runtime/src/test/java/eu/stratosphere/nephele/jobmanager/JobManagerITCase.java
+++ b/stratosphere-runtime/src/test/java/eu/stratosphere/nephele/jobmanager/JobManagerITCase.java
@@ -13,44 +13,46 @@
package eu.stratosphere.nephele.jobmanager;
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.fail;
-
-import java.io.BufferedReader;
-import java.io.File;
-import java.io.FileReader;
-import java.io.IOException;
-import java.util.HashMap;
-import java.util.Iterator;
-import java.util.Map;
-
-import org.apache.log4j.Level;
-import org.apache.log4j.Logger;
-import org.junit.AfterClass;
-import org.junit.BeforeClass;
-import org.junit.Test;
-
import eu.stratosphere.configuration.ConfigConstants;
import eu.stratosphere.configuration.Configuration;
import eu.stratosphere.configuration.GlobalConfiguration;
import eu.stratosphere.core.fs.Path;
import eu.stratosphere.nephele.client.JobClient;
import eu.stratosphere.nephele.client.JobExecutionException;
-import eu.stratosphere.nephele.io.DistributionPattern;
-import eu.stratosphere.nephele.io.channels.ChannelType;
-import eu.stratosphere.nephele.io.library.FileLineReader;
-import eu.stratosphere.nephele.io.library.FileLineWriter;
+import eu.stratosphere.nephele.jobgraph.DistributionPattern;
+import eu.stratosphere.runtime.io.channels.ChannelType;
import eu.stratosphere.nephele.jobgraph.JobFileInputVertex;
import eu.stratosphere.nephele.jobgraph.JobFileOutputVertex;
import eu.stratosphere.nephele.jobgraph.JobGraph;
import eu.stratosphere.nephele.jobgraph.JobGraphDefinitionException;
import eu.stratosphere.nephele.jobgraph.JobTaskVertex;
import eu.stratosphere.nephele.jobmanager.JobManager.ExecutionMode;
+import eu.stratosphere.nephele.taskmanager.Task;
import eu.stratosphere.nephele.taskmanager.TaskManager;
-import eu.stratosphere.nephele.taskmanager.runtime.RuntimeTask;
+import eu.stratosphere.nephele.util.FileLineReader;
+import eu.stratosphere.nephele.util.FileLineWriter;
import eu.stratosphere.nephele.util.JarFileCreator;
import eu.stratosphere.nephele.util.ServerTestUtils;
import eu.stratosphere.util.LogUtils;
+import eu.stratosphere.util.StringUtils;
+import org.apache.log4j.ConsoleAppender;
+import org.apache.log4j.Level;
+import org.apache.log4j.Logger;
+import org.apache.log4j.PatternLayout;
+import org.junit.AfterClass;
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+import java.io.BufferedReader;
+import java.io.File;
+import java.io.FileReader;
+import java.io.IOException;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.Map;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.fail;
/**
* This test is intended to cover the basic functionality of the {@link JobManager}.
@@ -185,8 +187,8 @@ public class JobManagerITCase {
// connect vertices
try {
i1.connectTo(t1, ChannelType.NETWORK);
- t1.connectTo(t2, ChannelType.INMEMORY);
- t2.connectTo(o1, ChannelType.INMEMORY);
+ t1.connectTo(t2, ChannelType.IN_MEMORY);
+ t2.connectTo(o1, ChannelType.IN_MEMORY);
} catch (JobGraphDefinitionException e) {
e.printStackTrace();
}
@@ -286,8 +288,8 @@ public class JobManagerITCase {
o1.setVertexToShareInstancesWith(i1);
// connect vertices
- i1.connectTo(t1, ChannelType.INMEMORY);
- t1.connectTo(o1, ChannelType.INMEMORY);
+ i1.connectTo(t1, ChannelType.IN_MEMORY);
+ t1.connectTo(o1, ChannelType.IN_MEMORY);
// add jar
jg.addJar(new Path(new File(ServerTestUtils.getTempDir() + File.separator + exceptionClassName + ".jar")
@@ -297,7 +299,7 @@ public class JobManagerITCase {
jobClient = new JobClient(jg, configuration);
// deactivate logging of expected test exceptions
- Logger rtLogger = Logger.getLogger(RuntimeTask.class);
+ Logger rtLogger = Logger.getLogger(Task.class);
Level rtLevel = rtLogger.getEffectiveLevel();
rtLogger.setLevel(Level.OFF);
@@ -382,8 +384,8 @@ public class JobManagerITCase {
o1.setVertexToShareInstancesWith(i1);
// connect vertices
- i1.connectTo(t1, ChannelType.INMEMORY);
- t1.connectTo(o1, ChannelType.INMEMORY);
+ i1.connectTo(t1, ChannelType.IN_MEMORY);
+ t1.connectTo(o1, ChannelType.IN_MEMORY);
// add jar
jg.addJar(new Path(new File(ServerTestUtils.getTempDir() + File.separator + runtimeExceptionClassName
@@ -492,8 +494,8 @@ public class JobManagerITCase {
// connect vertices
try {
i1.connectTo(t1, ChannelType.NETWORK);
- t1.connectTo(t2, ChannelType.INMEMORY);
- t2.connectTo(o1, ChannelType.INMEMORY);
+ t1.connectTo(t2, ChannelType.IN_MEMORY);
+ t2.connectTo(o1, ChannelType.IN_MEMORY);
} catch (JobGraphDefinitionException e) {
e.printStackTrace();
}
@@ -583,9 +585,9 @@ public class JobManagerITCase {
o1.setVertexToShareInstancesWith(i1);
// connect vertices
- i1.connectTo(t1, ChannelType.INMEMORY, DistributionPattern.POINTWISE);
+ i1.connectTo(t1, ChannelType.IN_MEMORY, DistributionPattern.POINTWISE);
i1.connectTo(t1, ChannelType.NETWORK, DistributionPattern.BIPARTITE);
- t1.connectTo(o1, ChannelType.INMEMORY);
+ t1.connectTo(o1, ChannelType.IN_MEMORY);
// add jar
jg.addJar(new Path(jarFile.toURI()));
@@ -657,7 +659,7 @@ public class JobManagerITCase {
o1.setVertexToShareInstancesWith(i1);
// connect vertices
- i1.connectTo(o1, ChannelType.INMEMORY);
+ i1.connectTo(o1, ChannelType.IN_MEMORY);
// add jar
jg.addJar(new Path(jarFile.toURI()));
@@ -751,9 +753,9 @@ public class JobManagerITCase {
u1.setVertexToShareInstancesWith(o1);
// connect vertices
- i1.connectTo(u1, ChannelType.INMEMORY, DistributionPattern.POINTWISE);
- i2.connectTo(u1, ChannelType.INMEMORY);
- u1.connectTo(o1, ChannelType.INMEMORY);
+ i1.connectTo(u1, ChannelType.IN_MEMORY, DistributionPattern.POINTWISE);
+ i2.connectTo(u1, ChannelType.IN_MEMORY);
+ u1.connectTo(o1, ChannelType.IN_MEMORY);
// add jar
jg.addJar(new Path(jarFile.toURI()));
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/2db78a8d/stratosphere-runtime/src/test/java/eu/stratosphere/nephele/jobmanager/UnionTask.java
----------------------------------------------------------------------
diff --git a/stratosphere-runtime/src/test/java/eu/stratosphere/nephele/jobmanager/UnionTask.java b/stratosphere-runtime/src/test/java/eu/stratosphere/nephele/jobmanager/UnionTask.java
index 3b95133..124a24d 100644
--- a/stratosphere-runtime/src/test/java/eu/stratosphere/nephele/jobmanager/UnionTask.java
+++ b/stratosphere-runtime/src/test/java/eu/stratosphere/nephele/jobmanager/UnionTask.java
@@ -14,9 +14,9 @@
package eu.stratosphere.nephele.jobmanager;
import eu.stratosphere.core.io.StringRecord;
-import eu.stratosphere.nephele.io.MutableRecordReader;
-import eu.stratosphere.nephele.io.RecordWriter;
-import eu.stratosphere.nephele.io.UnionRecordReader;
+import eu.stratosphere.runtime.io.api.MutableRecordReader;
+import eu.stratosphere.runtime.io.api.RecordWriter;
+import eu.stratosphere.runtime.io.api.UnionRecordReader;
import eu.stratosphere.nephele.template.AbstractTask;
/**
@@ -41,13 +41,17 @@ public class UnionTask extends AbstractTask {
recordReaders[1] = new MutableRecordReader<StringRecord>(this);
this.unionReader = new UnionRecordReader<StringRecord>(recordReaders, StringRecord.class);
- this.writer = new RecordWriter<StringRecord>(this, StringRecord.class);
+ this.writer = new RecordWriter<StringRecord>(this);
}
@Override
public void invoke() throws Exception {
+ this.writer.initializeSerializers();
+
while (this.unionReader.hasNext()) {
this.writer.emit(this.unionReader.next());
}
+
+ this.writer.flush();
}
}
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/2db78a8d/stratosphere-runtime/src/test/java/eu/stratosphere/nephele/jobmanager/scheduler/queue/QueueSchedulerTest.java
----------------------------------------------------------------------
diff --git a/stratosphere-runtime/src/test/java/eu/stratosphere/nephele/jobmanager/scheduler/queue/QueueSchedulerTest.java b/stratosphere-runtime/src/test/java/eu/stratosphere/nephele/jobmanager/scheduler/queue/QueueSchedulerTest.java
index 9fc4256..f1e3191 100644
--- a/stratosphere-runtime/src/test/java/eu/stratosphere/nephele/jobmanager/scheduler/queue/QueueSchedulerTest.java
+++ b/stratosphere-runtime/src/test/java/eu/stratosphere/nephele/jobmanager/scheduler/queue/QueueSchedulerTest.java
@@ -20,6 +20,7 @@ import static org.junit.Assert.fail;
import java.io.IOException;
import java.util.List;
+import eu.stratosphere.runtime.io.api.RecordWriter;
import org.junit.Test;
import eu.stratosphere.core.io.StringRecord;
@@ -29,9 +30,8 @@ import eu.stratosphere.nephele.executiongraph.ExecutionGraph;
import eu.stratosphere.nephele.executiongraph.ExecutionVertex;
import eu.stratosphere.nephele.executiongraph.GraphConversionException;
import eu.stratosphere.nephele.instance.InstanceManager;
-import eu.stratosphere.nephele.io.RecordReader;
-import eu.stratosphere.nephele.io.RecordWriter;
-import eu.stratosphere.nephele.io.channels.ChannelType;
+import eu.stratosphere.runtime.io.api.RecordReader;
+import eu.stratosphere.runtime.io.channels.ChannelType;
import eu.stratosphere.nephele.jobgraph.JobGraph;
import eu.stratosphere.nephele.jobgraph.JobGraphDefinitionException;
import eu.stratosphere.nephele.jobgraph.JobInputVertex;
@@ -57,7 +57,7 @@ public class QueueSchedulerTest {
*/
@Override
public void registerInputOutput() {
- new RecordWriter<StringRecord>(this, StringRecord.class);
+ new RecordWriter<StringRecord>(this);
}
/**
@@ -145,7 +145,7 @@ public class QueueSchedulerTest {
final TestDeploymentManager tdm = new TestDeploymentManager();
final QueueScheduler scheduler = new QueueScheduler(tdm, tim);
- final ExecutionGraph executionGraph = createExecutionGraph(ChannelType.INMEMORY, tim);
+ final ExecutionGraph executionGraph = createExecutionGraph(ChannelType.IN_MEMORY, tim);
try {
try {
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/2db78a8d/stratosphere-runtime/src/test/java/eu/stratosphere/nephele/managementgraph/ManagementGraphTest.java
----------------------------------------------------------------------
diff --git a/stratosphere-runtime/src/test/java/eu/stratosphere/nephele/managementgraph/ManagementGraphTest.java b/stratosphere-runtime/src/test/java/eu/stratosphere/nephele/managementgraph/ManagementGraphTest.java
index 4fd1ac1..630f365 100644
--- a/stratosphere-runtime/src/test/java/eu/stratosphere/nephele/managementgraph/ManagementGraphTest.java
+++ b/stratosphere-runtime/src/test/java/eu/stratosphere/nephele/managementgraph/ManagementGraphTest.java
@@ -19,7 +19,7 @@ import java.util.Iterator;
import org.junit.Test;
-import eu.stratosphere.nephele.io.channels.ChannelType;
+import eu.stratosphere.runtime.io.channels.ChannelType;
import eu.stratosphere.nephele.jobgraph.JobID;
import eu.stratosphere.nephele.util.ManagementTestUtils;
@@ -275,7 +275,7 @@ public class ManagementGraphTest {
// Group Edges
new ManagementGroupEdge(groupVertex1, 0, groupVertex2, 0, ChannelType.NETWORK);
new ManagementGroupEdge(groupVertex2, 0, groupVertex3, 0, ChannelType.NETWORK);
- new ManagementGroupEdge(groupVertex3, 0, groupVertex4, 0, ChannelType.INMEMORY);
+ new ManagementGroupEdge(groupVertex3, 0, groupVertex4, 0, ChannelType.IN_MEMORY);
// Edges
new ManagementEdge(new ManagementEdgeID(), new ManagementEdgeID(), outputGate1_1, 0, inputGate2_1, 0,
@@ -287,7 +287,7 @@ public class ManagementGraphTest {
new ManagementEdge(new ManagementEdgeID(), new ManagementEdgeID(), outputGate2_2, 0, inputGate3_1, 1,
ChannelType.NETWORK);
new ManagementEdge(new ManagementEdgeID(), new ManagementEdgeID(), outputGate3_1, 0, inputGate4_1, 0,
- ChannelType.INMEMORY);
+ ChannelType.IN_MEMORY);
return graph;
}
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/2db78a8d/stratosphere-runtime/src/test/java/eu/stratosphere/nephele/profiling/impl/InstanceProfilerTest.java
----------------------------------------------------------------------
diff --git a/stratosphere-runtime/src/test/java/eu/stratosphere/nephele/profiling/impl/InstanceProfilerTest.java b/stratosphere-runtime/src/test/java/eu/stratosphere/nephele/profiling/impl/InstanceProfilerTest.java
index dda9491..f1b83a6 100644
--- a/stratosphere-runtime/src/test/java/eu/stratosphere/nephele/profiling/impl/InstanceProfilerTest.java
+++ b/stratosphere-runtime/src/test/java/eu/stratosphere/nephele/profiling/impl/InstanceProfilerTest.java
@@ -74,7 +74,7 @@ public class InstanceProfilerTest {
@Before
public void setUp() throws Exception {
initMocks(this);
- when(this.infoMock.getAddress()).thenReturn(this.addressMock);
+ when(this.infoMock.address()).thenReturn(this.addressMock);
when(this.addressMock.getHostAddress()).thenReturn("192.168.1.1");
whenNew(FileReader.class).withArguments(InstanceProfiler.PROC_STAT).thenReturn(this.cpuReaderMock);
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/2db78a8d/stratosphere-runtime/src/test/java/eu/stratosphere/nephele/taskmanager/transferenvelope/DefaultDeserializerTest.java
----------------------------------------------------------------------
diff --git a/stratosphere-runtime/src/test/java/eu/stratosphere/nephele/taskmanager/transferenvelope/DefaultDeserializerTest.java b/stratosphere-runtime/src/test/java/eu/stratosphere/nephele/taskmanager/transferenvelope/DefaultDeserializerTest.java
deleted file mode 100644
index 1c9efd5..0000000
--- a/stratosphere-runtime/src/test/java/eu/stratosphere/nephele/taskmanager/transferenvelope/DefaultDeserializerTest.java
+++ /dev/null
@@ -1,358 +0,0 @@
-/***********************************************************************************************************************
- * Copyright (C) 2010-2013 by the Stratosphere project (http://stratosphere.eu)
- *
- * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
- * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations under the License.
- **********************************************************************************************************************/
-
-package eu.stratosphere.nephele.taskmanager.transferenvelope;
-
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertNotNull;
-import static org.junit.Assert.assertNull;
-import static org.junit.Assert.fail;
-
-import java.io.IOException;
-import java.nio.ByteBuffer;
-import java.nio.channels.ReadableByteChannel;
-import java.util.ArrayDeque;
-import java.util.Queue;
-
-import org.junit.Test;
-
-import eu.stratosphere.core.memory.MemorySegment;
-import eu.stratosphere.nephele.io.channels.Buffer;
-import eu.stratosphere.nephele.io.channels.BufferFactory;
-import eu.stratosphere.nephele.io.channels.ChannelID;
-import eu.stratosphere.nephele.io.channels.MemoryBuffer;
-import eu.stratosphere.nephele.jobgraph.JobID;
-import eu.stratosphere.nephele.taskmanager.bufferprovider.BufferAvailabilityListener;
-import eu.stratosphere.nephele.taskmanager.bufferprovider.BufferProvider;
-import eu.stratosphere.nephele.taskmanager.bufferprovider.BufferProviderBroker;
-import eu.stratosphere.nephele.util.BufferPoolConnector;
-import eu.stratosphere.nephele.util.InterruptibleByteChannel;
-import eu.stratosphere.util.StringUtils;
-
-/**
- * This class contains tests covering the deserialization of a byte stream to a transfer envelope.
- *
- */
-public class DefaultDeserializerTest {
-
- /**
- * The size of the test byte buffers in byte.
- */
- private static final int TEST_BUFFER_CAPACITY = 1024;
-
- /**
- * The sequence number to be used during the tests.
- */
- private static final int SEQUENCE_NUMBER = 0;
-
- /**
- * The job ID to be used during the tests.
- */
- private static final JobID JOB_ID = new JobID();
-
- /**
- * The channel ID to be used during the tests.
- */
- private static final ChannelID CHANNEL_ID = new ChannelID();
-
- /**
- * A dummy implementation of a {@link BufferProvider} which is used in this test.
- * <p>
- * This class is not thread-safe.
- *
- */
- private static final class TestBufferProvider implements BufferProvider {
-
- /**
- * Stores the available byte buffers.
- */
- private final Queue<MemorySegment> bufferPool;
-
- /**
- * Constructs a new test buffer provider.
- *
- * @param numberOfBuffers
- * the number of byte buffers this pool has available.
- */
- private TestBufferProvider(final int numberOfBuffers) {
-
- this.bufferPool = new ArrayDeque<MemorySegment>(numberOfBuffers);
- for (int i = 0; i < numberOfBuffers; ++i) {
- this.bufferPool.add(new MemorySegment(new byte[TEST_BUFFER_CAPACITY]));
- }
- }
-
- /**
- * {@inheritDoc}
- */
- @Override
- public Buffer requestEmptyBuffer(final int minimumSizeOfBuffer) throws IOException {
-
- if (this.bufferPool.isEmpty()) {
- return null;
- }
-
- return BufferFactory.createFromMemory(minimumSizeOfBuffer, this.bufferPool.poll(),
- new BufferPoolConnector(this.bufferPool));
- }
-
- /**
- * {@inheritDoc}
- */
- @Override
- public Buffer requestEmptyBufferBlocking(final int minimumSizeOfBuffer) throws IOException,
- InterruptedException {
-
- throw new IllegalStateException("requestEmptyBufferBlocking called");
- }
-
- /**
- * {@inheritDoc}
- */
- @Override
- public int getMaximumBufferSize() {
-
- throw new IllegalStateException("getMaximumBufferSize called");
- }
-
- /**
- * {@inheritDoc}
- */
- @Override
- public boolean isShared() {
-
- throw new IllegalStateException("isShared called");
- }
-
- /**
- * {@inheritDoc}
- */
- @Override
- public void reportAsynchronousEvent() {
-
- throw new IllegalStateException("reportAsynchronousEvent called");
- }
-
- /**
- * {@inheritDoc}
- */
- @Override
- public boolean registerBufferAvailabilityListener(final BufferAvailabilityListener bufferAvailabilityListener) {
-
- throw new IllegalStateException("registerBufferAvailabilityListener called");
- }
- }
-
- /**
- * A dummy implementation of a {@link BufferProviderBroker} which is used during this test.
- * <p>
- * This class is not thread-safe.
- *
- */
- private static final class TestBufferProviderBroker implements BufferProviderBroker {
-
- private final BufferProvider bufferProvider;
-
- private TestBufferProviderBroker(final BufferProvider bufferProvider) {
- this.bufferProvider = bufferProvider;
- }
-
- /**
- * {@inheritDoc}
- */
- @Override
- public BufferProvider getBufferProvider(final JobID jobID, final ChannelID sourceChannelID) throws IOException,
- InterruptedException {
-
- return this.bufferProvider;
- }
- }
-
- /**
- * Constructs an {@link InterruptibleByteChannel} from which the deserializer to be tested can read its data.
- *
- * @param readInterruptPositions
- * the positions after which the byte stream shall be interrupted
- * @param testBufferSize
- * the size of the test buffer to create
- * @return an {@link InterruptibleByteChannel} holding the serialized data in memory
- * @throws IOException
- * thrown if an error occurs while serializing the original data
- */
- private ReadableByteChannel createByteChannel(final int[] readInterruptPositions, final int testBufferSize)
- throws IOException {
-
- final TransferEnvelope te = new TransferEnvelope(SEQUENCE_NUMBER, JOB_ID, CHANNEL_ID);
-
- if (testBufferSize >= 0) {
-
- if (testBufferSize > 100) {
- throw new IllegalStateException("Test buffer size can be 100 bytes at most");
- }
-
- final Queue<MemorySegment> bufferPool = new ArrayDeque<MemorySegment>();
- final MemorySegment ms = new MemorySegment(new byte[TEST_BUFFER_CAPACITY]);
-
- final MemoryBuffer buffer = BufferFactory.createFromMemory(ms.size(), ms, new BufferPoolConnector(bufferPool));
-
- final ByteBuffer srcBuffer = ByteBuffer.allocate(testBufferSize);
- for (int i = 0; i < testBufferSize; ++i) {
- srcBuffer.put((byte) i);
- }
- srcBuffer.flip();
-
- buffer.write(srcBuffer);
- buffer.flip();
- te.setBuffer(buffer);
- }
-
- final DefaultSerializer ds = new DefaultSerializer();
- ds.setTransferEnvelope(te);
-
- final InterruptibleByteChannel ibc = new InterruptibleByteChannel(null, readInterruptPositions);
-
- while (ds.write(ibc));
-
- ibc.switchToReadPhase();
-
- return ibc;
- }
-
- /**
- * Executes the deserialization method.
- *
- * @param rbc
- * the byte channel to read the serialized data from
- * @param bpb
- * the buffer provider broker to request empty buffers from
- * @return the deserialized transfer envelope
- * @throws IOException
- * thrown if an error occurs during the deserialization process
- * @throws NoBufferAvailableException
- * thrown if the buffer provider broker could not provide an empty buffer
- */
- private TransferEnvelope executeDeserialization(final ReadableByteChannel rbc, final BufferProviderBroker bpb)
- throws IOException, NoBufferAvailableException {
-
- final DefaultDeserializer dd = new DefaultDeserializer(bpb);
-
- TransferEnvelope te = dd.getFullyDeserializedTransferEnvelope();
- while (te == null) {
-
- dd.read(rbc);
- te = dd.getFullyDeserializedTransferEnvelope();
- }
-
- assertEquals(SEQUENCE_NUMBER, te.getSequenceNumber());
- assertEquals(JOB_ID, te.getJobID());
- assertEquals(CHANNEL_ID, te.getSource());
-
- return te;
- }
-
- /**
- * Tests the deserialization process of a {@link TransferEnvelope} with a buffer when no interruption of the byte
- * stream.
- */
- @Test
- public void testDeserializationWithBufferAndWithoutInterruption() {
-
- try {
-
- final ReadableByteChannel rbc = createByteChannel(null, 10);
-
- final TestBufferProviderBroker tbpb = new TestBufferProviderBroker(new TestBufferProvider(1));
-
- final TransferEnvelope te = executeDeserialization(rbc, tbpb);
-
- assertNotNull(te.getBuffer());
- assertEquals(10, te.getBuffer().size());
-
- } catch (IOException ioe) {
- fail(StringUtils.stringifyException(ioe));
- } catch (NoBufferAvailableException nbae) {
- fail(StringUtils.stringifyException(nbae));
- }
- }
-
- /**
- * Tests the deserialization process of a {@link TransferEnvelope} with a buffer and interruptions of the byte
- * stream.
- */
- @Test
- public void testDeserializationWithBufferAndInterruptions() {
-
- try {
-
- final ReadableByteChannel rbc = createByteChannel(new int[] { 3, 7, 24, 52 }, 10);
-
- final TestBufferProviderBroker tbpb = new TestBufferProviderBroker(new TestBufferProvider(1));
-
- final TransferEnvelope te = executeDeserialization(rbc, tbpb);
-
- assertNotNull(te.getBuffer());
- assertEquals(10, te.getBuffer().size());
-
- } catch (IOException ioe) {
- fail(StringUtils.stringifyException(ioe));
- } catch (NoBufferAvailableException nbae) {
- fail(StringUtils.stringifyException(nbae));
- }
- }
-
- /**
- * Tests the deserialization process of a {@link TransferEnvelope} without a buffer and without interruptions of the
- * byte stream.
- */
- @Test
- public void testDeserializationWithoutBufferAndInterruptions() {
-
- try {
-
- final ReadableByteChannel rbc = createByteChannel(null, -1);
-
- final TestBufferProviderBroker tbpb = new TestBufferProviderBroker(new TestBufferProvider(1));
-
- final TransferEnvelope te = executeDeserialization(rbc, tbpb);
-
- assertNull(te.getBuffer());
-
- } catch (IOException ioe) {
- fail(StringUtils.stringifyException(ioe));
- } catch (NoBufferAvailableException nbae) {
- fail(StringUtils.stringifyException(nbae));
- }
- }
-
- /**
- * Tests the deserialization process in case the buffer provide cannot deliver an empty buffer to read the byte
- * stream into.
- */
- @Test
- public void testDeserializationWithNoBufferAvailable() {
-
- try {
- final ReadableByteChannel rbc = createByteChannel(null, 10);
- final TestBufferProviderBroker tbpb = new TestBufferProviderBroker(new TestBufferProvider(0));
- executeDeserialization(rbc, tbpb);
-
- } catch (IOException ioe) {
- fail(StringUtils.stringifyException(ioe));
- } catch (NoBufferAvailableException nbae) {
- // Expected exception was successfully caught
- return;
- }
-
- fail("Expected NoBufferAvailableException but has not been thrown");
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/2db78a8d/stratosphere-runtime/src/test/java/eu/stratosphere/nephele/taskmanager/transferenvelope/DefaultSerializerTest.java
----------------------------------------------------------------------
diff --git a/stratosphere-runtime/src/test/java/eu/stratosphere/nephele/taskmanager/transferenvelope/DefaultSerializerTest.java b/stratosphere-runtime/src/test/java/eu/stratosphere/nephele/taskmanager/transferenvelope/DefaultSerializerTest.java
deleted file mode 100644
index a50fbe2..0000000
--- a/stratosphere-runtime/src/test/java/eu/stratosphere/nephele/taskmanager/transferenvelope/DefaultSerializerTest.java
+++ /dev/null
@@ -1,313 +0,0 @@
-/***********************************************************************************************************************
- * Copyright (C) 2010-2013 by the Stratosphere project (http://stratosphere.eu)
- *
- * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
- * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations under the License.
- **********************************************************************************************************************/
-
-package eu.stratosphere.nephele.taskmanager.transferenvelope;
-
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.fail;
-
-import java.io.File;
-import java.io.FileInputStream;
-import java.io.FileOutputStream;
-import java.io.IOException;
-import java.nio.ByteBuffer;
-import java.nio.channels.FileChannel;
-import java.util.ArrayDeque;
-import java.util.Deque;
-
-import org.junit.Test;
-
-import eu.stratosphere.core.memory.MemorySegment;
-import eu.stratosphere.nephele.io.AbstractID;
-import eu.stratosphere.nephele.io.channels.BufferFactory;
-import eu.stratosphere.nephele.io.channels.ChannelID;
-import eu.stratosphere.nephele.io.channels.MemoryBuffer;
-import eu.stratosphere.nephele.jobgraph.JobID;
-import eu.stratosphere.nephele.util.BufferPoolConnector;
-import eu.stratosphere.nephele.util.ServerTestUtils;
-
-/**
- * This class contains tests covering the serialization of transfer envelopes to a byte stream.
- *
- */
-public class DefaultSerializerTest {
-
- /**
- * The maximum size of the transfer envelope's buffer.
- */
- private static final int BUFFER_SIZE = 4096; // 4 KB;
-
- /**
- * An arbitrarily chosen byte used to fill the transfer envelope's buffer.
- */
- private static final byte BUFFER_CONTENT = 13;
-
- /**
- * The size of a sequence number.
- */
- private static final int SIZE_OF_SEQ_NR = 4;
-
- /**
- * The size of an ID.
- */
- private static final int SIZE_OF_ID = 16;
-
- /**
- * The size of an integer number.
- */
- private static final int SIZE_OF_INTEGER = 4;
-
- /**
- * The job ID used during the serialization process.
- */
- private final JobID jobID = new JobID();
-
- /**
- * The target channel ID used during the serialization process.
- */
- private final ChannelID sourceChannelID = new ChannelID();
-
- /**
- * Auxiliary class to explicitly access the internal buffer of an ID object.
- *
- */
- private static class SerializationTestID extends AbstractID {
-
- /**
- * Constructs a new ID.
- *
- * @param content
- * a byte buffer representing the ID
- */
- private SerializationTestID(byte[] content) {
- super(content);
- }
- }
-
- /**
- * This test checks the correctness of the serialization of {@link TransferEnvelope} objects.
- */
- @Test
- public void testSerialization() {
-
- try {
-
- // Generate test file
- final File testFile = generateDataStream();
-
- // Analyze the test file
- analyzeStream(testFile);
-
- // Delete the test file
- testFile.delete();
-
- } catch (IOException e) {
- fail(e.getMessage());
- }
- }
-
- /**
- * Generates and serializes a series of {@link TransferEnvelope} objects to a random file.
- *
- * @return the file containing the serializes envelopes
- * @throws IOException
- * thrown if an I/O error occurs while writing the envelopes
- */
- private File generateDataStream() throws IOException {
-
- final File outputFile = new File(ServerTestUtils.getTempDir() + File.separator
- + ServerTestUtils.getRandomFilename());
- final FileOutputStream outputStream = new FileOutputStream(outputFile);
- final FileChannel fileChannel = outputStream.getChannel();
- final Deque<MemorySegment> recycleQueue = new ArrayDeque<MemorySegment>();
- final DefaultSerializer serializer = new DefaultSerializer();
- final MemorySegment byteBuffer = new MemorySegment(new byte[BUFFER_SIZE]);
- final ByteBuffer initBuffer = ByteBuffer.allocate(1);
-
- // The byte buffer is initialized from this buffer
- initBuffer.put(BUFFER_CONTENT);
- initBuffer.flip();
-
- // Put byte buffer to recycled queue
- recycleQueue.add(byteBuffer);
-
- for (int i = 0; i < BUFFER_SIZE; i++) {
-
- final MemoryBuffer buffer = BufferFactory.createFromMemory(i, recycleQueue.poll(), new BufferPoolConnector(
- recycleQueue));
-
- // Initialize buffer
- for (int j = 0; j < i; j++) {
- buffer.write(initBuffer);
- initBuffer.position(0);
- }
- buffer.flip();
-
- final TransferEnvelope transferEnvelope = new TransferEnvelope(i, this.jobID, this.sourceChannelID);
- transferEnvelope.setBuffer(buffer);
-
- // set envelope to be serialized and write it to file channel
- serializer.setTransferEnvelope(transferEnvelope);
- while (serializer.write(fileChannel));
-
- // Put buffer back to the recycling queue
- buffer.recycleBuffer();
- }
-
- fileChannel.close();
-
- return outputFile;
- }
-
- /**
- * Analyzes the given test file and checks whether its content matches Nephele's serialization pattern.
- *
- * @param testFile
- * the test file to analyze
- * @throws IOException
- * thrown if an I/O error occurs while reading the test file
- */
- private void analyzeStream(File testFile) throws IOException {
-
- FileInputStream fileInputStream = new FileInputStream(testFile);
-
- for (int i = 0; i < BUFFER_SIZE; i++) {
-
- readAndCheckSequenceNumber(fileInputStream, i);
- readAndCheckID(fileInputStream, this.jobID);
- readAndCheckID(fileInputStream, this.sourceChannelID);
- readAndCheckNotificationList(fileInputStream);
- readAndCheckBuffer(fileInputStream, i);
- }
-
- fileInputStream.close();
- }
-
- /**
- * Attempts to read a buffer of the given size from the file stream and checks the buffer's content.
- *
- * @param fileInputStream
- * the file stream to read from
- * @param expectedBufferSize
- * the expected size of the buffer
- * @throws IOException
- * thrown if an error occurs while reading from the file stream
- */
- private static void readAndCheckBuffer(FileInputStream fileInputStream, int expectedBufferSize) throws IOException {
-
- // Check if buffer exists
- assertEquals(1L, fileInputStream.read());
-
- byte[] temp = new byte[SIZE_OF_INTEGER];
- fileInputStream.read(temp);
- int bufferSize = bufferToInteger(temp);
-
- assertEquals(expectedBufferSize, bufferSize);
-
- byte[] buffer = new byte[bufferSize];
- int r = fileInputStream.read(buffer);
- for (int i = 0; i < buffer.length; i++) {
- assertEquals(BUFFER_CONTENT, buffer[i]);
- }
- }
-
- /**
- * Attempts to read an empty notification list from the given file input stream.
- *
- * @param fileInputStream
- * the file input stream to read from
- * @throws IOException
- * thrown if an I/O occurs while reading data from the stream
- */
- private void readAndCheckNotificationList(FileInputStream fileInputStream) throws IOException {
-
- if (fileInputStream.read() != 0) {
-
- byte[] temp = new byte[SIZE_OF_INTEGER];
-
- fileInputStream.read(temp);
- final int sizeOfDataBlock = bufferToInteger(temp);
-
- assertEquals(SIZE_OF_INTEGER, sizeOfDataBlock);
-
- fileInputStream.read(temp);
- final int sizeOfNotificationList = bufferToInteger(temp);
-
- assertEquals(0, sizeOfNotificationList);
- }
- }
-
- /**
- * Attempts to read an integer number from the given file input stream and compares it to
- * <code>expectedSequenceNumber</code>.
- *
- * @param fileInputStream
- * the file input stream to read from
- * @param expectedSeqNumber
- * the integer number the read number is expected to match
- * @throws IOException
- * thrown if an I/O occurs while reading data from the stream
- */
- private void readAndCheckSequenceNumber(FileInputStream fileInputStream, int expectedSeqNumber) throws IOException {
-
- byte[] temp = new byte[SIZE_OF_SEQ_NR];
- fileInputStream.read(temp);
- int seqNumber = bufferToInteger(temp);
-
- assertEquals(seqNumber, expectedSeqNumber);
- }
-
- /**
- * Attempts to read a channel ID from the given file input stream and compares it to <code>expectedChannelID</code>.
- *
- * @param fileInputStream
- * the file input stream to read from
- * @param expectedID
- * the ID which the read ID is expected to match
- * @throws IOException
- * thrown if an I/O occurs while reading data from the stream
- */
- private void readAndCheckID(FileInputStream fileInputStream, AbstractID expectedID) throws IOException {
-
- byte[] temp = new byte[SIZE_OF_INTEGER];
- fileInputStream.read(temp);
-
- final int sizeOfID = bufferToInteger(temp); // ID has fixed size and therefore does not announce its size
-
- assertEquals(sizeOfID, SIZE_OF_ID);
-
- byte[] id = new byte[sizeOfID];
- fileInputStream.read(id);
-
- final AbstractID channelID = new SerializationTestID(id);
- assertEquals(expectedID, channelID);
- }
-
- /**
- * Converts the first four bytes of the provided buffer's content to an integer number.
- *
- * @param buffer
- * the buffer to convert
- * @return the integer number converted from the first four bytes of the buffer's content
- */
- private static int bufferToInteger(byte[] buffer) {
-
- int integer = 0;
-
- for (int i = 0; i < SIZE_OF_INTEGER; ++i) {
- integer |= (buffer[SIZE_OF_INTEGER - 1 - i] & 0xff) << (i << 3);
- }
-
- return integer;
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/2db78a8d/stratosphere-runtime/src/test/java/eu/stratosphere/nephele/util/BufferPoolConnector.java
----------------------------------------------------------------------
diff --git a/stratosphere-runtime/src/test/java/eu/stratosphere/nephele/util/BufferPoolConnector.java b/stratosphere-runtime/src/test/java/eu/stratosphere/nephele/util/BufferPoolConnector.java
deleted file mode 100644
index 3f2c79e..0000000
--- a/stratosphere-runtime/src/test/java/eu/stratosphere/nephele/util/BufferPoolConnector.java
+++ /dev/null
@@ -1,53 +0,0 @@
-/***********************************************************************************************************************
- * Copyright (C) 2010-2013 by the Stratosphere project (http://stratosphere.eu)
- *
- * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
- * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations under the License.
- **********************************************************************************************************************/
-
-package eu.stratosphere.nephele.util;
-
-import java.util.Queue;
-
-import eu.stratosphere.core.memory.MemorySegment;
-import eu.stratosphere.nephele.io.channels.MemoryBufferPoolConnector;
-
-/**
- * This is a simple implementation of a {@link MemoryBufferPoolConnector} used for the server unit tests.
- * <p>
- * This class is thread-safe.
- *
- */
-public final class BufferPoolConnector implements MemoryBufferPoolConnector {
-
- /**
- * Reference to the memory pool the byte buffer was originally taken from.
- */
- private final Queue<MemorySegment> memoryPool;
-
- /**
- * Constructs a new buffer pool connector
- *
- * @param bufferPool
- * a reference to the memory pool the byte buffer was originally taken from
- */
- public BufferPoolConnector(final Queue<MemorySegment> bufferPool) {
- this.memoryPool = bufferPool;
- }
-
-
- @Override
- public void recycle(final MemorySegment memSeg) {
-
- synchronized (this.memoryPool) {
- this.memoryPool.add(memSeg);
- this.memoryPool.notify();
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/2db78a8d/stratosphere-runtime/src/test/java/eu/stratosphere/nephele/util/DiscardingRecycler.java
----------------------------------------------------------------------
diff --git a/stratosphere-runtime/src/test/java/eu/stratosphere/nephele/util/DiscardingRecycler.java b/stratosphere-runtime/src/test/java/eu/stratosphere/nephele/util/DiscardingRecycler.java
new file mode 100644
index 0000000..9d4d2a5
--- /dev/null
+++ b/stratosphere-runtime/src/test/java/eu/stratosphere/nephele/util/DiscardingRecycler.java
@@ -0,0 +1,24 @@
+/***********************************************************************************************************************
+ *
+ * Copyright (C) 2010-2013 by the Stratosphere project (http://stratosphere.eu)
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
+ * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations under the License.
+ *
+ **********************************************************************************************************************/
+package eu.stratosphere.nephele.util;
+
+import eu.stratosphere.core.memory.MemorySegment;
+import eu.stratosphere.runtime.io.BufferRecycler;
+
+public class DiscardingRecycler implements BufferRecycler {
+
+ @Override
+ public void recycle(MemorySegment memSeg) {}
+}
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/2db78a8d/stratosphere-runtime/src/test/java/eu/stratosphere/nephele/util/FileLineReader.java
----------------------------------------------------------------------
diff --git a/stratosphere-runtime/src/test/java/eu/stratosphere/nephele/util/FileLineReader.java b/stratosphere-runtime/src/test/java/eu/stratosphere/nephele/util/FileLineReader.java
new file mode 100644
index 0000000..fcb4fa1
--- /dev/null
+++ b/stratosphere-runtime/src/test/java/eu/stratosphere/nephele/util/FileLineReader.java
@@ -0,0 +1,80 @@
+/***********************************************************************************************************************
+ * Copyright (C) 2010-2013 by the Stratosphere project (http://stratosphere.eu)
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
+ * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations under the License.
+ **********************************************************************************************************************/
+
+package eu.stratosphere.nephele.util;
+
+import java.util.Iterator;
+
+import eu.stratosphere.core.fs.FSDataInputStream;
+import eu.stratosphere.core.fs.FileInputSplit;
+import eu.stratosphere.core.fs.FileSystem;
+import eu.stratosphere.core.io.StringRecord;
+import eu.stratosphere.runtime.io.api.RecordWriter;
+import eu.stratosphere.nephele.template.AbstractFileInputTask;
+import eu.stratosphere.runtime.fs.LineReader;
+
+/**
+ * A file line reader reads the associated file input splits line by line and outputs the lines as string records.
+ *
+ */
+public class FileLineReader extends AbstractFileInputTask {
+
+ private RecordWriter<StringRecord> output = null;
+
+ @Override
+ public void invoke() throws Exception {
+
+ output.initializeSerializers();
+
+ final Iterator<FileInputSplit> splitIterator = getFileInputSplits();
+
+ while (splitIterator.hasNext()) {
+
+ final FileInputSplit split = splitIterator.next();
+
+ long start = split.getStart();
+ long length = split.getLength();
+
+ final FileSystem fs = FileSystem.get(split.getPath().toUri());
+
+ final FSDataInputStream fdis = fs.open(split.getPath());
+
+ final LineReader lineReader = new LineReader(fdis, start, length, (1024 * 1024));
+
+ byte[] line = lineReader.readLine();
+
+ while (line != null) {
+
+ // Create a string object from the data read
+ StringRecord str = new StringRecord();
+ str.set(line);
+
+ // Send out string
+ output.emit(str);
+
+ line = lineReader.readLine();
+ }
+
+ // Close the stream;
+ lineReader.close();
+ }
+
+ this.output.flush();
+ }
+
+ @Override
+ public void registerInputOutput() {
+ output = new RecordWriter<StringRecord>(this);
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/2db78a8d/stratosphere-runtime/src/test/java/eu/stratosphere/nephele/util/FileLineWriter.java
----------------------------------------------------------------------
diff --git a/stratosphere-runtime/src/test/java/eu/stratosphere/nephele/util/FileLineWriter.java b/stratosphere-runtime/src/test/java/eu/stratosphere/nephele/util/FileLineWriter.java
new file mode 100644
index 0000000..bc738df
--- /dev/null
+++ b/stratosphere-runtime/src/test/java/eu/stratosphere/nephele/util/FileLineWriter.java
@@ -0,0 +1,75 @@
+/***********************************************************************************************************************
+ * Copyright (C) 2010-2013 by the Stratosphere project (http://stratosphere.eu)
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
+ * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations under the License.
+ **********************************************************************************************************************/
+
+package eu.stratosphere.nephele.util;
+
+import eu.stratosphere.core.fs.FSDataOutputStream;
+import eu.stratosphere.core.fs.FileStatus;
+import eu.stratosphere.core.fs.FileSystem;
+import eu.stratosphere.core.fs.Path;
+import eu.stratosphere.core.io.StringRecord;
+import eu.stratosphere.runtime.io.api.RecordReader;
+import eu.stratosphere.nephele.template.AbstractFileOutputTask;
+
+/**
+ * A file line writer reads string records its input gate and writes them to the associated output file.
+ *
+ */
+public class FileLineWriter extends AbstractFileOutputTask {
+
+ /**
+ * The record reader through which incoming string records are received.
+ */
+ private RecordReader<StringRecord> input = null;
+
+
+ @Override
+ public void invoke() throws Exception {
+
+ Path outputPath = getFileOutputPath();
+
+ FileSystem fs = FileSystem.get(outputPath.toUri());
+ if (fs.exists(outputPath)) {
+ FileStatus status = fs.getFileStatus(outputPath);
+
+ if (status.isDir()) {
+ outputPath = new Path(outputPath.toUri().toString() + "/file_" + getIndexInSubtaskGroup() + ".txt");
+ }
+ }
+
+ final FSDataOutputStream outputStream = fs.create(outputPath, true);
+
+ while (this.input.hasNext()) {
+
+ StringRecord record = this.input.next();
+ byte[] recordByte = (record.toString() + "\r\n").getBytes();
+ outputStream.write(recordByte, 0, recordByte.length);
+ }
+
+ outputStream.close();
+
+ }
+
+
+ @Override
+ public void registerInputOutput() {
+ this.input = new RecordReader<StringRecord>(this, StringRecord.class);
+ }
+
+
+ @Override
+ public int getMaximumNumberOfSubtasks() {
+ // The default implementation always returns -1
+ return -1;
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/2db78a8d/stratosphere-runtime/src/test/java/eu/stratosphere/nephele/util/TestBufferProvider.java
----------------------------------------------------------------------
diff --git a/stratosphere-runtime/src/test/java/eu/stratosphere/nephele/util/TestBufferProvider.java b/stratosphere-runtime/src/test/java/eu/stratosphere/nephele/util/TestBufferProvider.java
new file mode 100644
index 0000000..09b244f
--- /dev/null
+++ b/stratosphere-runtime/src/test/java/eu/stratosphere/nephele/util/TestBufferProvider.java
@@ -0,0 +1,76 @@
+/***********************************************************************************************************************
+ *
+ * Copyright (C) 2010-2013 by the Stratosphere project (http://stratosphere.eu)
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
+ * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations under the License.
+ *
+ **********************************************************************************************************************/
+package eu.stratosphere.nephele.util;
+
+import eu.stratosphere.core.memory.MemorySegment;
+import eu.stratosphere.runtime.io.Buffer;
+import eu.stratosphere.runtime.io.BufferRecycler;
+import eu.stratosphere.runtime.io.network.bufferprovider.BufferAvailabilityListener;
+import eu.stratosphere.runtime.io.network.bufferprovider.BufferProvider;
+
+import java.io.IOException;
+import java.util.Random;
+
+public class TestBufferProvider implements BufferProvider {
+
+ private final BufferRecycler recycler = new DiscardingRecycler();
+
+ private final Random rnd = new Random();
+
+ private final int sizeOfMemorySegments;
+
+ private final float probabilityForNoneAvailable;
+
+
+ public TestBufferProvider(int sizeOfMemorySegments) {
+ this(sizeOfMemorySegments, -1.0f);
+ }
+
+ public TestBufferProvider(int sizeOfMemorySegments, float probabilityForNoneAvailable) {
+ this.sizeOfMemorySegments = sizeOfMemorySegments;
+ this.probabilityForNoneAvailable = probabilityForNoneAvailable;
+ }
+
+ @Override
+ public Buffer requestBuffer(int sizeOfBuffer) throws IOException {
+ if (rnd.nextFloat() < this.probabilityForNoneAvailable) {
+ return null;
+ } else {
+ MemorySegment segment = new MemorySegment(new byte[this.sizeOfMemorySegments]);
+ return new Buffer(segment, sizeOfBuffer, this.recycler);
+ }
+ }
+
+ @Override
+ public Buffer requestBufferBlocking(int sizeOfBuffer) throws IOException, InterruptedException {
+ MemorySegment segment = new MemorySegment(new byte[this.sizeOfMemorySegments]);
+ return new Buffer(segment, sizeOfBuffer, this.recycler);
+ }
+
+ @Override
+ public int getBufferSize() {
+ return Integer.MAX_VALUE;
+ }
+
+ @Override
+ public void reportAsynchronousEvent() {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public boolean registerBufferAvailabilityListener(BufferAvailabilityListener bufferAvailabilityListener) {
+ throw new UnsupportedOperationException();
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/2db78a8d/stratosphere-runtime/src/test/java/eu/stratosphere/pact/runtime/task/DataSinkTaskTest.java
----------------------------------------------------------------------
diff --git a/stratosphere-runtime/src/test/java/eu/stratosphere/pact/runtime/task/DataSinkTaskTest.java b/stratosphere-runtime/src/test/java/eu/stratosphere/pact/runtime/task/DataSinkTaskTest.java
index 6c66b78..bfd0d42 100644
--- a/stratosphere-runtime/src/test/java/eu/stratosphere/pact/runtime/task/DataSinkTaskTest.java
+++ b/stratosphere-runtime/src/test/java/eu/stratosphere/pact/runtime/task/DataSinkTaskTest.java
@@ -42,8 +42,12 @@ import eu.stratosphere.types.IntValue;
import eu.stratosphere.types.Key;
import eu.stratosphere.types.Record;
-public class DataSinkTaskTest extends TaskTestBase
-{
+public class DataSinkTaskTest extends TaskTestBase {
+
+ private static final int MEMORY_MANAGER_SIZE = 1024 * 1024;
+
+ private static final int NETWORK_BUFFER_SIZE = 1024;
+
private static final Log LOG = LogFactory.getLog(DataSinkTaskTest.class);
private final String tempTestPath = Path.constructTestPath("dst_test");
@@ -61,8 +65,8 @@ public class DataSinkTaskTest extends TaskTestBase
int keyCnt = 100;
int valCnt = 20;
-
- super.initEnvironment(1024 * 1024);
+
+ super.initEnvironment(MEMORY_MANAGER_SIZE, NETWORK_BUFFER_SIZE);
super.addInput(new UniformRecordGenerator(keyCnt, valCnt, false), 0);
DataSinkTask<Record> testTask = new DataSinkTask<Record>();
@@ -127,8 +131,8 @@ public class DataSinkTaskTest extends TaskTestBase
int keyCnt = 100;
int valCnt = 20;
-
- super.initEnvironment(1024 * 1024);
+
+ super.initEnvironment(MEMORY_MANAGER_SIZE, NETWORK_BUFFER_SIZE);
super.addInput(new UniformRecordGenerator(keyCnt, valCnt, 0, 0, false), 0);
super.addInput(new UniformRecordGenerator(keyCnt, valCnt, keyCnt, 0, false), 0);
super.addInput(new UniformRecordGenerator(keyCnt, valCnt, keyCnt*2, 0, false), 0);
@@ -197,8 +201,8 @@ public class DataSinkTaskTest extends TaskTestBase
int keyCnt = 100;
int valCnt = 20;
-
- super.initEnvironment(1024 * 1024 * 4);
+
+ super.initEnvironment(MEMORY_MANAGER_SIZE * 4, NETWORK_BUFFER_SIZE);
super.addInput(new UniformRecordGenerator(keyCnt, valCnt, true), 0);
DataSinkTask<Record> testTask = new DataSinkTask<Record>();
@@ -275,8 +279,8 @@ public class DataSinkTaskTest extends TaskTestBase
int keyCnt = 100;
int valCnt = 20;
-
- super.initEnvironment(1024 * 1024);
+
+ super.initEnvironment(MEMORY_MANAGER_SIZE, NETWORK_BUFFER_SIZE);
super.addInput(new UniformRecordGenerator(keyCnt, valCnt, false), 0);
DataSinkTask<Record> testTask = new DataSinkTask<Record>();
@@ -307,8 +311,8 @@ public class DataSinkTaskTest extends TaskTestBase
int keyCnt = 100;
int valCnt = 20;
-
- super.initEnvironment(4 * 1024 * 1024);
+
+ super.initEnvironment(MEMORY_MANAGER_SIZE * 4, NETWORK_BUFFER_SIZE);
super.addInput(new UniformRecordGenerator(keyCnt, valCnt, true), 0);
DataSinkTask<Record> testTask = new DataSinkTask<Record>();
@@ -343,8 +347,8 @@ public class DataSinkTaskTest extends TaskTestBase
@Test
public void testCancelDataSinkTask() {
-
- super.initEnvironment(1024 * 1024);
+
+ super.initEnvironment(MEMORY_MANAGER_SIZE, NETWORK_BUFFER_SIZE);
super.addInput(new InfiniteInputIterator(), 0);
final DataSinkTask<Record> testTask = new DataSinkTask<Record>();
@@ -385,8 +389,8 @@ public class DataSinkTaskTest extends TaskTestBase
@Test
@SuppressWarnings("unchecked")
public void testCancelSortingDataSinkTask() {
-
- super.initEnvironment(4 * 1024 * 1024);
+
+ super.initEnvironment(MEMORY_MANAGER_SIZE * 4, NETWORK_BUFFER_SIZE);
super.addInput(new InfiniteInputIterator(), 0);
final DataSinkTask<Record> testTask = new DataSinkTask<Record>();
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/2db78a8d/stratosphere-runtime/src/test/java/eu/stratosphere/pact/runtime/task/DataSourceTaskTest.java
----------------------------------------------------------------------
diff --git a/stratosphere-runtime/src/test/java/eu/stratosphere/pact/runtime/task/DataSourceTaskTest.java b/stratosphere-runtime/src/test/java/eu/stratosphere/pact/runtime/task/DataSourceTaskTest.java
index 0198db2..732cf8d 100644
--- a/stratosphere-runtime/src/test/java/eu/stratosphere/pact/runtime/task/DataSourceTaskTest.java
+++ b/stratosphere-runtime/src/test/java/eu/stratosphere/pact/runtime/task/DataSourceTaskTest.java
@@ -37,8 +37,12 @@ import eu.stratosphere.types.IntValue;
import eu.stratosphere.types.Record;
import eu.stratosphere.util.MutableObjectIterator;
-public class DataSourceTaskTest extends TaskTestBase
-{
+public class DataSourceTaskTest extends TaskTestBase {
+
+ private static final int MEMORY_MANAGER_SIZE = 1024 * 1024;
+
+ private static final int NETWORK_BUFFER_SIZE = 1024;
+
private List<Record> outList;
private String tempTestPath = Path.constructTestPath("dst_test");
@@ -50,11 +54,9 @@ public class DataSourceTaskTest extends TaskTestBase
tempTestFile.delete();
}
}
-
@Test
public void testDataSourceTask() {
-
int keyCnt = 100;
int valCnt = 20;
@@ -67,7 +69,7 @@ public class DataSourceTaskTest extends TaskTestBase
Assert.fail("Unable to set-up test input file");
}
- super.initEnvironment(1024 * 1024);
+ super.initEnvironment(MEMORY_MANAGER_SIZE, NETWORK_BUFFER_SIZE);
super.addOutput(this.outList);
DataSourceTask<Record> testTask = new DataSourceTask<Record>();
@@ -110,7 +112,6 @@ public class DataSourceTaskTest extends TaskTestBase
@Test
public void testFailingDataSourceTask() {
-
int keyCnt = 20;
int valCnt = 10;
@@ -122,8 +123,8 @@ public class DataSourceTaskTest extends TaskTestBase
} catch (IOException e1) {
Assert.fail("Unable to set-up test input file");
}
-
- super.initEnvironment(1024 * 1024);
+
+ super.initEnvironment(MEMORY_MANAGER_SIZE, NETWORK_BUFFER_SIZE);
super.addOutput(this.outList);
DataSourceTask<Record> testTask = new DataSourceTask<Record>();
@@ -148,11 +149,10 @@ public class DataSourceTaskTest extends TaskTestBase
@Test
public void testCancelDataSourceTask() {
-
int keyCnt = 20;
int valCnt = 4;
-
- super.initEnvironment(1024 * 1024);
+
+ super.initEnvironment(MEMORY_MANAGER_SIZE, NETWORK_BUFFER_SIZE);
super.addOutput(new NirvanaOutputList());
try {
@@ -184,7 +184,7 @@ public class DataSourceTaskTest extends TaskTestBase
try {
tct.join();
- taskRunner.join();
+ taskRunner.join();
} catch(InterruptedException ie) {
Assert.fail("Joining threads failed");
}
@@ -192,12 +192,10 @@ public class DataSourceTaskTest extends TaskTestBase
// assert that temp file was created
File tempTestFile = new File(this.tempTestPath);
Assert.assertTrue("Temp output file does not exist",tempTestFile.exists());
-
}
- private static class InputFilePreparator
- {
+ private static class InputFilePreparator {
public static void prepareInputFile(MutableObjectIterator<Record> inIt, String inputFilePath, boolean insertInvalidData)
throws IOException
{
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/2db78a8d/stratosphere-runtime/src/test/java/eu/stratosphere/pact/runtime/task/chaining/ChainTaskTest.java
----------------------------------------------------------------------
diff --git a/stratosphere-runtime/src/test/java/eu/stratosphere/pact/runtime/task/chaining/ChainTaskTest.java b/stratosphere-runtime/src/test/java/eu/stratosphere/pact/runtime/task/chaining/ChainTaskTest.java
index 100bf7b..dda215e 100644
--- a/stratosphere-runtime/src/test/java/eu/stratosphere/pact/runtime/task/chaining/ChainTaskTest.java
+++ b/stratosphere-runtime/src/test/java/eu/stratosphere/pact/runtime/task/chaining/ChainTaskTest.java
@@ -43,7 +43,11 @@ import eu.stratosphere.util.LogUtils;
public class ChainTaskTest extends TaskTestBase {
-
+
+ private static final int MEMORY_MANAGER_SIZE = 1024 * 1024 * 3;
+
+ private static final int NETWORK_BUFFER_SIZE = 1024;
+
private final List<Record> outList = new ArrayList<Record>();
@SuppressWarnings("unchecked")
@@ -65,9 +69,8 @@ public class ChainTaskTest extends TaskTestBase {
final int valCnt = 20;
try {
-
// environment
- initEnvironment(3*1024*1024);
+ super.initEnvironment(MEMORY_MANAGER_SIZE, NETWORK_BUFFER_SIZE);
addInput(new UniformRecordGenerator(keyCnt, valCnt, false), 0);
addOutput(this.outList);
@@ -123,7 +126,7 @@ public class ChainTaskTest extends TaskTestBase {
try {
// environment
- initEnvironment(3*1024*1024);
+ super.initEnvironment(MEMORY_MANAGER_SIZE, NETWORK_BUFFER_SIZE);
addInput(new UniformRecordGenerator(keyCnt, valCnt, false), 0);
addOutput(this.outList);
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/2db78a8d/stratosphere-runtime/src/test/java/eu/stratosphere/pact/runtime/task/util/OutputEmitterTest.java
----------------------------------------------------------------------
diff --git a/stratosphere-runtime/src/test/java/eu/stratosphere/pact/runtime/task/util/OutputEmitterTest.java b/stratosphere-runtime/src/test/java/eu/stratosphere/pact/runtime/task/util/OutputEmitterTest.java
index cc00387..c66d821 100644
--- a/stratosphere-runtime/src/test/java/eu/stratosphere/pact/runtime/task/util/OutputEmitterTest.java
+++ b/stratosphere-runtime/src/test/java/eu/stratosphere/pact/runtime/task/util/OutputEmitterTest.java
@@ -26,6 +26,7 @@ import org.apache.commons.lang.NotImplementedException;
import org.junit.Test;
import eu.stratosphere.api.common.typeutils.TypeComparator;
+import eu.stratosphere.runtime.io.api.ChannelSelector;
import eu.stratosphere.api.common.typeutils.base.IntSerializer;
import eu.stratosphere.api.java.typeutils.runtime.record.RecordComparatorFactory;
import eu.stratosphere.api.java.typeutils.runtime.record.RecordSerializerFactory;
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/2db78a8d/stratosphere-runtime/src/test/java/eu/stratosphere/pact/runtime/task/util/RecordOutputEmitterTest.java
----------------------------------------------------------------------
diff --git a/stratosphere-runtime/src/test/java/eu/stratosphere/pact/runtime/task/util/RecordOutputEmitterTest.java b/stratosphere-runtime/src/test/java/eu/stratosphere/pact/runtime/task/util/RecordOutputEmitterTest.java
index 8afc78f..0b968d8 100644
--- a/stratosphere-runtime/src/test/java/eu/stratosphere/pact/runtime/task/util/RecordOutputEmitterTest.java
+++ b/stratosphere-runtime/src/test/java/eu/stratosphere/pact/runtime/task/util/RecordOutputEmitterTest.java
@@ -27,6 +27,8 @@ import org.junit.Test;
import eu.stratosphere.api.common.distributions.DataDistribution;
import eu.stratosphere.api.common.distributions.UniformIntegerDistribution;
+import eu.stratosphere.runtime.io.api.ChannelSelector;
+import eu.stratosphere.pact.runtime.plugable.pactrecord.RecordComparator;
import eu.stratosphere.nephele.io.ChannelSelector;
import eu.stratosphere.api.java.typeutils.runtime.record.RecordComparator;
import eu.stratosphere.pact.runtime.shipping.RecordOutputEmitter;
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/2db78a8d/stratosphere-runtime/src/test/java/eu/stratosphere/pact/runtime/test/util/MockEnvironment.java
----------------------------------------------------------------------
diff --git a/stratosphere-runtime/src/test/java/eu/stratosphere/pact/runtime/test/util/MockEnvironment.java b/stratosphere-runtime/src/test/java/eu/stratosphere/pact/runtime/test/util/MockEnvironment.java
index 8e25082..a397312 100644
--- a/stratosphere-runtime/src/test/java/eu/stratosphere/pact/runtime/test/util/MockEnvironment.java
+++ b/stratosphere-runtime/src/test/java/eu/stratosphere/pact/runtime/test/util/MockEnvironment.java
@@ -13,38 +13,40 @@
package eu.stratosphere.pact.runtime.test.util;
-import java.io.IOException;
-import java.util.LinkedList;
-import java.util.List;
-import java.util.Map;
-import java.util.Set;
-import java.util.concurrent.FutureTask;
-
import eu.stratosphere.configuration.Configuration;
import eu.stratosphere.core.fs.Path;
import eu.stratosphere.core.io.IOReadableWritable;
+import eu.stratosphere.core.memory.MemorySegment;
import eu.stratosphere.nephele.execution.Environment;
-import eu.stratosphere.nephele.io.ChannelSelector;
-import eu.stratosphere.nephele.io.GateID;
-import eu.stratosphere.nephele.io.InputChannelResult;
-import eu.stratosphere.nephele.io.InputGate;
-import eu.stratosphere.nephele.io.MutableRecordDeserializerFactory;
-import eu.stratosphere.nephele.io.OutputGate;
-import eu.stratosphere.nephele.io.RecordAvailabilityListener;
-import eu.stratosphere.nephele.io.RecordDeserializerFactory;
-import eu.stratosphere.nephele.io.RuntimeInputGate;
-import eu.stratosphere.nephele.io.RuntimeOutputGate;
-import eu.stratosphere.nephele.io.channels.ChannelID;
+import eu.stratosphere.runtime.io.gates.InputChannelResult;
+import eu.stratosphere.runtime.io.gates.RecordAvailabilityListener;
+import eu.stratosphere.runtime.io.serialization.AdaptiveSpanningRecordDeserializer;
+import eu.stratosphere.runtime.io.Buffer;
+import eu.stratosphere.runtime.io.channels.ChannelID;
+import eu.stratosphere.runtime.io.gates.GateID;
+import eu.stratosphere.runtime.io.gates.InputGate;
+import eu.stratosphere.runtime.io.gates.OutputGate;
import eu.stratosphere.nephele.jobgraph.JobID;
import eu.stratosphere.nephele.protocols.AccumulatorProtocol;
import eu.stratosphere.nephele.services.iomanager.IOManager;
import eu.stratosphere.nephele.services.memorymanager.MemoryManager;
import eu.stratosphere.nephele.services.memorymanager.spi.DefaultMemoryManager;
+import eu.stratosphere.runtime.io.network.bufferprovider.BufferAvailabilityListener;
+import eu.stratosphere.runtime.io.network.bufferprovider.BufferProvider;
+import eu.stratosphere.runtime.io.network.bufferprovider.GlobalBufferPool;
+import eu.stratosphere.runtime.io.network.bufferprovider.LocalBufferPoolOwner;
import eu.stratosphere.nephele.template.InputSplitProvider;
+import eu.stratosphere.runtime.io.serialization.RecordDeserializer;
+import eu.stratosphere.runtime.io.serialization.RecordDeserializer.DeserializationResult;
import eu.stratosphere.types.Record;
import eu.stratosphere.util.MutableObjectIterator;
-public class MockEnvironment implements Environment {
+import java.io.IOException;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Set;
+
+public class MockEnvironment implements Environment, BufferProvider, LocalBufferPoolOwner {
private final MemoryManager memManager;
@@ -56,21 +58,24 @@ public class MockEnvironment implements Environment {
private final Configuration taskConfiguration;
- private final List<RuntimeInputGate<Record>> inputs;
+ private final List<InputGate<Record>> inputs;
- private final List<RuntimeOutputGate<Record>> outputs;
+ private final List<OutputGate> outputs;
private final JobID jobID = new JobID();
- public MockEnvironment(long memorySize, MockInputSplitProvider inputSplitProvider) {
+ private final Buffer mockBuffer;
+
+ public MockEnvironment(long memorySize, MockInputSplitProvider inputSplitProvider, int bufferSize) {
this.jobConfiguration = new Configuration();
this.taskConfiguration = new Configuration();
- this.inputs = new LinkedList<RuntimeInputGate<Record>>();
- this.outputs = new LinkedList<RuntimeOutputGate<Record>>();
+ this.inputs = new LinkedList<InputGate<Record>>();
+ this.outputs = new LinkedList<OutputGate>();
this.memManager = new DefaultMemoryManager(memorySize);
this.ioManager = new IOManager(System.getProperty("java.io.tmpdir"));
this.inputSplitProvider = inputSplitProvider;
+ this.mockBuffer = new Buffer(new MemorySegment(new byte[bufferSize]), bufferSize, null);
}
public void addInput(MutableObjectIterator<Record> inputIterator) {
@@ -103,13 +108,62 @@ public class MockEnvironment implements Environment {
return this.jobID;
}
+ @Override
+ public Buffer requestBuffer(int minBufferSize) throws IOException {
+ return mockBuffer;
+ }
+
+ @Override
+ public Buffer requestBufferBlocking(int minBufferSize) throws IOException, InterruptedException {
+ return mockBuffer;
+ }
+
+ @Override
+ public int getBufferSize() {
+ return this.mockBuffer.size();
+ }
+
+ @Override
+ public boolean registerBufferAvailabilityListener(BufferAvailabilityListener listener) {
+ return false;
+ }
+
+ @Override
+ public int getNumberOfChannels() {
+ return 1;
+ }
+
+ @Override
+ public void setDesignatedNumberOfBuffers(int numBuffers) {
+
+ }
+
+ @Override
+ public void clearLocalBufferPool() {
+
+ }
+
+ @Override
+ public void registerGlobalBufferPool(GlobalBufferPool globalBufferPool) {
+
+ }
+
+ @Override
+ public void logBufferUtilization() {
+
+ }
+
+ @Override
+ public void reportAsynchronousEvent() {
- private static class MockInputGate extends RuntimeInputGate<Record> {
+ }
+
+ private static class MockInputGate extends InputGate<Record> {
private MutableObjectIterator<Record> it;
public MockInputGate(int id, MutableObjectIterator<Record> it) {
- super(new JobID(), new GateID(), MutableRecordDeserializerFactory.<Record>get(), id);
+ super(new JobID(), new GateID(), id);
this.it = it;
}
@@ -132,18 +186,43 @@ public class MockEnvironment implements Environment {
}
}
- private static class MockOutputGate extends RuntimeOutputGate<Record> {
+ private class MockOutputGate extends OutputGate {
private List<Record> out;
+ private RecordDeserializer<Record> deserializer;
+
+ private Record record;
+
public MockOutputGate(int index, List<Record> outList) {
- super(new JobID(), new GateID(), Record.class, index, null, false);
+ super(new JobID(), new GateID(), index);
this.out = outList;
+ this.deserializer = new AdaptiveSpanningRecordDeserializer<Record>();
+ this.record = new Record();
}
@Override
- public void writeRecord(Record record) throws IOException, InterruptedException {
- out.add(record.createCopy());
+ public void sendBuffer(Buffer buffer, int targetChannel) throws IOException, InterruptedException {
+
+ this.deserializer.setNextMemorySegment(MockEnvironment.this.mockBuffer.getMemorySegment(), MockEnvironment.this.mockBuffer.size());
+
+ while (this.deserializer.hasUnfinishedData()) {
+ DeserializationResult result = this.deserializer.getNextRecord(this.record);
+
+ if (result.isFullRecord()) {
+ this.out.add(this.record.createCopy());
+ }
+
+ if (result == DeserializationResult.LAST_RECORD_FROM_BUFFER ||
+ result == DeserializationResult.PARTIAL_RECORD) {
+ break;
+ }
+ }
+ }
+
+ @Override
+ public int getNumChannels() {
+ return 1;
}
}
@@ -188,11 +267,6 @@ public class MockEnvironment implements Environment {
}
@Override
- public GateID getNextUnboundOutputGateID() {
- return null;
- }
-
- @Override
public int getNumberOfOutputGates() {
return this.outputs.size();
}
@@ -203,16 +277,6 @@ public class MockEnvironment implements Environment {
}
@Override
- public void registerOutputGate(final OutputGate<? extends IOReadableWritable> outputGate) {
- // Nothing to do here
- }
-
- @Override
- public void registerInputGate(final InputGate<? extends IOReadableWritable> inputGate) {
- // Nothing to do here
- }
-
- @Override
public Set<ChannelID> getOutputChannelIDs() {
throw new IllegalStateException("getOutputChannelIDs called on MockEnvironment");
}
@@ -242,18 +306,14 @@ public class MockEnvironment implements Environment {
throw new IllegalStateException("getInputChannelIDsOfGate called on MockEnvironment");
}
- @SuppressWarnings("unchecked")
@Override
- public <T extends IOReadableWritable> OutputGate<T> createOutputGate(GateID gateID, Class<T> outputClass,
- ChannelSelector<T> selector, boolean isBroadcast)
+ public OutputGate createAndRegisterOutputGate()
{
- return (OutputGate<T>) this.outputs.remove(0);
+ return this.outputs.remove(0);
}
- @SuppressWarnings("unchecked")
@Override
- public <T extends IOReadableWritable> InputGate<T> createInputGate(GateID gateID,
- RecordDeserializerFactory<T> deserializerFactory)
+ public <T extends IOReadableWritable> InputGate<T> createAndRegisterInputGate()
{
return (InputGate<T>) this.inputs.remove(0);
}
@@ -275,8 +335,7 @@ public class MockEnvironment implements Environment {
}
@Override
- public Map<String, FutureTask<Path>> getCopyTask() {
- return null;
+ public BufferProvider getOutputBufferProvider() {
+ return this;
}
-
}
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/2db78a8d/stratosphere-runtime/src/test/java/eu/stratosphere/pact/runtime/test/util/TaskTestBase.java
----------------------------------------------------------------------
diff --git a/stratosphere-runtime/src/test/java/eu/stratosphere/pact/runtime/test/util/TaskTestBase.java b/stratosphere-runtime/src/test/java/eu/stratosphere/pact/runtime/test/util/TaskTestBase.java
index 826113c..a60b479 100644
--- a/stratosphere-runtime/src/test/java/eu/stratosphere/pact/runtime/test/util/TaskTestBase.java
+++ b/stratosphere-runtime/src/test/java/eu/stratosphere/pact/runtime/test/util/TaskTestBase.java
@@ -49,10 +49,10 @@ public abstract class TaskTestBase {
protected MockEnvironment mockEnv;
- public void initEnvironment(long memorySize) {
+ public void initEnvironment(long memorySize, int bufferSize) {
this.memorySize = memorySize;
this.inputSplitProvider = new MockInputSplitProvider();
- this.mockEnv = new MockEnvironment(this.memorySize, this.inputSplitProvider);
+ this.mockEnv = new MockEnvironment(this.memorySize, this.inputSplitProvider, bufferSize);
}
public void addInput(MutableObjectIterator<Record> input, int groupId) {
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/2db78a8d/stratosphere-runtime/src/test/java/eu/stratosphere/runtime/fs/LineReaderTest.java
----------------------------------------------------------------------
diff --git a/stratosphere-runtime/src/test/java/eu/stratosphere/runtime/fs/LineReaderTest.java b/stratosphere-runtime/src/test/java/eu/stratosphere/runtime/fs/LineReaderTest.java
new file mode 100644
index 0000000..af46689
--- /dev/null
+++ b/stratosphere-runtime/src/test/java/eu/stratosphere/runtime/fs/LineReaderTest.java
@@ -0,0 +1,78 @@
+/***********************************************************************************************************************
+ * Copyright (C) 2010-2013 by the Stratosphere project (http://stratosphere.eu)
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
+ * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations under the License.
+ **********************************************************************************************************************/
+
+package eu.stratosphere.runtime.fs;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.fail;
+
+import java.io.File;
+import java.io.PrintWriter;
+
+import org.junit.Test;
+
+import eu.stratosphere.core.fs.FSDataInputStream;
+import eu.stratosphere.core.fs.Path;
+import eu.stratosphere.core.fs.local.LocalFileSystem;
+import eu.stratosphere.nephele.util.CommonTestUtils;
+
+/**
+ * This class tests the functionality of the LineReader class using a local filesystem.
+ *
+ */
+
+public class LineReaderTest {
+
+ /**
+ * This test tests the LineReader. So far only under usual conditions.
+ */
+ @Test
+ public void testLineReader() {
+ final File testfile = new File(CommonTestUtils.getTempDir() + File.separator
+ + CommonTestUtils.getRandomFilename());
+ final Path pathtotestfile = new Path(testfile.toURI().getPath());
+
+ try {
+ PrintWriter pw = new PrintWriter(testfile, "UTF8");
+
+ for (int i = 0; i < 100; i++) {
+ pw.append("line\n");
+ }
+ pw.close();
+
+ LocalFileSystem lfs = new LocalFileSystem();
+ FSDataInputStream fis = lfs.open(pathtotestfile);
+
+ // first, we test under "usual" conditions
+ final LineReader lr = new LineReader(fis, 0, testfile.length(), 256);
+
+ byte[] buffer;
+ int linecount = 0;
+ while ((buffer = lr.readLine()) != null) {
+ assertEquals(new String(buffer, "UTF8"), "line");
+ linecount++;
+ }
+ assertEquals(linecount, 100);
+
+ // the linereader can not handle situations with larger length than the total file...
+
+ } catch (Exception e) {
+ fail(e.toString());
+ e.printStackTrace();
+ } finally {
+ testfile.delete();
+ }
+
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/2db78a8d/stratosphere-runtime/src/test/java/eu/stratosphere/runtime/fs/s3/S3FileSystemTest.java
----------------------------------------------------------------------
diff --git a/stratosphere-runtime/src/test/java/eu/stratosphere/runtime/fs/s3/S3FileSystemTest.java b/stratosphere-runtime/src/test/java/eu/stratosphere/runtime/fs/s3/S3FileSystemTest.java
new file mode 100644
index 0000000..30b5219
--- /dev/null
+++ b/stratosphere-runtime/src/test/java/eu/stratosphere/runtime/fs/s3/S3FileSystemTest.java
@@ -0,0 +1,460 @@
+/***********************************************************************************************************************
+ * Copyright (C) 2010-2013 by the Stratosphere project (http://stratosphere.eu)
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
+ * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations under the License.
+ **********************************************************************************************************************/
+
+package eu.stratosphere.runtime.fs.s3;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertNull;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
+
+import java.io.FileNotFoundException;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+import java.net.URI;
+
+import org.junit.Before;
+import org.junit.Test;
+
+import eu.stratosphere.configuration.Configuration;
+import eu.stratosphere.configuration.GlobalConfiguration;
+import eu.stratosphere.core.fs.BlockLocation;
+import eu.stratosphere.core.fs.FSDataInputStream;
+import eu.stratosphere.core.fs.FSDataOutputStream;
+import eu.stratosphere.core.fs.FileStatus;
+import eu.stratosphere.core.fs.FileSystem;
+import eu.stratosphere.core.fs.Path;
+
+/**
+ * This test checks the S3 implementation of the {@link FileSystem} interface.
+ *
+ */
+public class S3FileSystemTest {
+
+ /**
+ * The length of the bucket/object names used in this test.
+ */
+ private static final int NAME_LENGTH = 32;
+
+ /**
+ * The alphabet to generate the random bucket/object names from.
+ */
+ private static final char[] ALPHABET = { 'a', 'b', 'c', 'd', 'e', 'f', 'g', 'h', 'i', 'j', 'k', 'l', 'm', 'n', 'o',
+ 'p', 'q', 'r', 's', 't', 'u', 'v', 'w', 'x', 'y', 'z', '0', '1', '2', '3', '4', '5', '6', '7', '8', '9' };
+
+ /**
+ * The size of the byte buffer used during the tests in bytes.
+ */
+ private static final int TEST_BUFFER_SIZE = 128;
+
+ /**
+ * The size of the small test file in bytes.
+ */
+ private static final int SMALL_FILE_SIZE = 512;
+
+ /**
+ * The size of the large test file in bytes.
+ */
+ private static final int LARGE_FILE_SIZE = 1024 * 1024 * 12; // 12 MB
+
+ /**
+ * The modulus to be used when generating the test data. Must not be larger than 128.
+ */
+ private static final int MODULUS = 128;
+
+ private static final String S3_BASE_URI = "s3:///";
+
+ /**
+ * Tries to read the AWS access key and the AWS secret key from the environments variables. If accessing these keys
+ * fails, all tests will be skipped and marked as successful.
+ */
+ @Before
+ public void initKeys() {
+ final String accessKey = System.getenv("AK");
+ final String secretKey = System.getenv("SK");
+
+ if (accessKey != null || secretKey != null) {
+ Configuration conf = new Configuration();
+ if (accessKey != null) {
+ conf.setString(S3FileSystem.S3_ACCESS_KEY_KEY, accessKey);
+ }
+ if (secretKey != null) {
+ conf.setString(S3FileSystem.S3_SECRET_KEY_KEY, secretKey);
+ }
+ GlobalConfiguration.includeConfiguration(conf);
+ }
+ }
+
+ /**
+ * This test creates and deletes a bucket inside S3 and checks it is correctly displayed inside the directory
+ * listing.
+ */
+ @Test
+ public void createAndDeleteBucketTest() {
+
+ if (!testActivated()) {
+ return;
+ }
+
+ final String bucketName = getRandomName();
+ final Path bucketPath = new Path(S3_BASE_URI + bucketName + Path.SEPARATOR);
+
+ try {
+
+ final FileSystem fs = bucketPath.getFileSystem();
+
+ // Create directory
+ fs.mkdirs(bucketPath);
+
+ // Check if directory is correctly displayed in file system hierarchy
+ final FileStatus[] content = fs.listStatus(new Path(S3_BASE_URI));
+ boolean entryFound = false;
+ for (final FileStatus entry : content) {
+ if (bucketPath.equals(entry.getPath())) {
+ entryFound = true;
+ break;
+ }
+ }
+
+ if (!entryFound) {
+ fail("Cannot find entry " + bucketName + " in directory " + S3_BASE_URI);
+ }
+
+ // Check the concrete directory file status
+ try {
+ final FileStatus directoryFileStatus = fs.getFileStatus(bucketPath);
+ assertTrue(directoryFileStatus.isDir());
+ assertEquals(0L, directoryFileStatus.getAccessTime());
+ assertTrue(directoryFileStatus.getModificationTime() > 0L);
+
+ } catch (FileNotFoundException e) {
+ fail(e.getMessage());
+ }
+
+ // Delete the bucket
+ fs.delete(bucketPath, true);
+
+ // Make sure the bucket no longer exists
+ try {
+ fs.getFileStatus(bucketPath);
+ fail("Expected FileNotFoundException for " + bucketPath.toUri());
+ } catch (FileNotFoundException e) {
+ // This is an expected exception
+ }
+
+ } catch (IOException ioe) {
+ fail(ioe.getMessage());
+ }
+ }
+
+ /**
+ * Creates and reads the a larger test file in S3. The test file is generated according to a specific pattern.
+ * During the read phase the incoming data stream is also checked against this pattern.
+ */
+ @Test
+ public void createAndReadLargeFileTest() {
+
+ try {
+ createAndReadFileTest(LARGE_FILE_SIZE);
+ } catch (IOException ioe) {
+ fail(ioe.getMessage());
+ }
+ }
+
+ /**
+ * Creates and reads the a small test file in S3. The test file is generated according to a specific pattern.
+ * During the read phase the incoming data stream is also checked against this pattern.
+ */
+ @Test
+ public void createAndReadSmallFileTest() {
+
+ try {
+ createAndReadFileTest(SMALL_FILE_SIZE);
+ } catch (IOException ioe) {
+ fail(ioe.getMessage());
+ }
+ }
+
+ /**
+ * The tests checks the mapping of the file system directory structure to the underlying bucket/object model of
+ * Amazon S3.
+ */
+ @Test
+ public void multiLevelDirectoryTest() {
+
+ if (!testActivated()) {
+ return;
+ }
+
+ final String dirName = getRandomName();
+ final String subdirName = getRandomName();
+ final String subsubdirName = getRandomName();
+ final String fileName = getRandomName();
+ final Path dir = new Path(S3_BASE_URI + dirName + Path.SEPARATOR);
+ final Path subdir = new Path(S3_BASE_URI + dirName + Path.SEPARATOR + subdirName + Path.SEPARATOR);
+ final Path subsubdir = new Path(S3_BASE_URI + dirName + Path.SEPARATOR + subdirName + Path.SEPARATOR
+ + subsubdirName + Path.SEPARATOR);
+ final Path file = new Path(S3_BASE_URI + dirName + Path.SEPARATOR + subdirName + Path.SEPARATOR + fileName);
+
+ try {
+
+ final FileSystem fs = dir.getFileSystem();
+
+ fs.mkdirs(subsubdir);
+
+ final OutputStream os = fs.create(file, true);
+ generateTestData(os, SMALL_FILE_SIZE);
+ os.close();
+
+ // On this directory levels there should only be one subdirectory
+ FileStatus[] list = fs.listStatus(dir);
+ int numberOfDirs = 0;
+ int numberOfFiles = 0;
+ for (final FileStatus entry : list) {
+
+ if (entry.isDir()) {
+ ++numberOfDirs;
+ assertEquals(subdir, entry.getPath());
+ } else {
+ fail(entry.getPath() + " is a file which must not appear on this directory level");
+ }
+ }
+
+ assertEquals(1, numberOfDirs);
+ assertEquals(0, numberOfFiles);
+
+ list = fs.listStatus(subdir);
+ numberOfDirs = 0;
+
+ for (final FileStatus entry : list) {
+ if (entry.isDir()) {
+ assertEquals(subsubdir, entry.getPath());
+ ++numberOfDirs;
+ } else {
+ assertEquals(file, entry.getPath());
+ ++numberOfFiles;
+ }
+ }
+
+ assertEquals(1, numberOfDirs);
+ assertEquals(1, numberOfFiles);
+
+ fs.delete(dir, true);
+
+ } catch (IOException ioe) {
+ fail(ioe.getMessage());
+ }
+ }
+
+ /**
+ * This test checks the S3 implementation of the file system method to retrieve the block locations of a file.
+ */
+ @Test
+ public void blockLocationTest() {
+
+ if (!testActivated()) {
+ return;
+ }
+
+ final String dirName = getRandomName();
+ final String fileName = getRandomName();
+ final Path dir = new Path(S3_BASE_URI + dirName + Path.SEPARATOR);
+ final Path file = new Path(S3_BASE_URI + dirName + Path.SEPARATOR + fileName);
+
+ try {
+
+ final FileSystem fs = dir.getFileSystem();
+
+ fs.mkdirs(dir);
+
+ final OutputStream os = fs.create(file, true);
+ generateTestData(os, SMALL_FILE_SIZE);
+ os.close();
+
+ final FileStatus fileStatus = fs.getFileStatus(file);
+ assertNotNull(fileStatus);
+
+ BlockLocation[] blockLocations = fs.getFileBlockLocations(fileStatus, 0, SMALL_FILE_SIZE + 1);
+ assertNull(blockLocations);
+
+ blockLocations = fs.getFileBlockLocations(fileStatus, 0, SMALL_FILE_SIZE);
+ assertEquals(1, blockLocations.length);
+
+ final BlockLocation bl = blockLocations[0];
+ assertNotNull(bl.getHosts());
+ assertEquals(1, bl.getHosts().length);
+ assertEquals(SMALL_FILE_SIZE, bl.getLength());
+ assertEquals(0, bl.getOffset());
+ final URI s3Uri = fs.getUri();
+ assertNotNull(s3Uri);
+ assertEquals(s3Uri.getHost(), bl.getHosts()[0]);
+
+ fs.delete(dir, true);
+
+ } catch (IOException ioe) {
+ fail(ioe.getMessage());
+ }
+ }
+
+ /**
+ * Creates and reads a file with the given size in S3. The test file is generated according to a specific pattern.
+ * During the read phase the incoming data stream is also checked against this pattern.
+ *
+ * @param fileSize
+ * the size of the file to be generated in bytes
+ * @throws IOException
+ * thrown if an I/O error occurs while writing or reading the test file
+ */
+ private void createAndReadFileTest(final int fileSize) throws IOException {
+
+ if (!testActivated()) {
+ return;
+ }
+
+ final String bucketName = getRandomName();
+ final String objectName = getRandomName();
+ final Path bucketPath = new Path(S3_BASE_URI + bucketName + Path.SEPARATOR);
+ final Path objectPath = new Path(S3_BASE_URI + bucketName + Path.SEPARATOR + objectName);
+
+ FileSystem fs = bucketPath.getFileSystem();
+
+ // Create test bucket
+ fs.mkdirs(bucketPath);
+
+ // Write test file to S3
+ final FSDataOutputStream outputStream = fs.create(objectPath, false);
+ generateTestData(outputStream, fileSize);
+ outputStream.close();
+
+ // Now read the same file back from S3
+ final FSDataInputStream inputStream = fs.open(objectPath);
+ testReceivedData(inputStream, fileSize);
+ inputStream.close();
+
+ // Delete test bucket
+ fs.delete(bucketPath, true);
+ }
+
+ /**
+ * Receives test data from the given input stream and checks the size of the data as well as the pattern inside the
+ * received data.
+ *
+ * @param inputStream
+ * the input stream to read the test data from
+ * @param expectedSize
+ * the expected size of the data to be read from the input stream in bytes
+ * @throws IOException
+ * thrown if an error occurs while reading the data
+ */
+ private void testReceivedData(final InputStream inputStream, final int expectedSize) throws IOException {
+
+ final byte[] testBuffer = new byte[TEST_BUFFER_SIZE];
+
+ int totalBytesRead = 0;
+ int nextExpectedNumber = 0;
+ while (true) {
+
+ final int bytesRead = inputStream.read(testBuffer);
+ if (bytesRead < 0) {
+ break;
+ }
+
+ totalBytesRead += bytesRead;
+
+ for (int i = 0; i < bytesRead; ++i) {
+ if (testBuffer[i] != nextExpectedNumber) {
+ throw new IOException("Read number " + testBuffer[i] + " but expected " + nextExpectedNumber);
+ }
+
+ ++nextExpectedNumber;
+
+ if (nextExpectedNumber == MODULUS) {
+ nextExpectedNumber = 0;
+ }
+ }
+ }
+
+ if (totalBytesRead != expectedSize) {
+ throw new IOException("Expected to read " + expectedSize + " bytes but only received " + totalBytesRead);
+ }
+ }
+
+ /**
+ * Generates test data of the given size according to some specific pattern and writes it to the provided output
+ * stream.
+ *
+ * @param outputStream
+ * the output stream to write the data to
+ * @param size
+ * the size of the test data to be generated in bytes
+ * @throws IOException
+ * thrown if an error occurs while writing the data
+ */
+ private void generateTestData(final OutputStream outputStream, final int size) throws IOException {
+
+ final byte[] testBuffer = new byte[TEST_BUFFER_SIZE];
+ for (int i = 0; i < testBuffer.length; ++i) {
+ testBuffer[i] = (byte) (i % MODULUS);
+ }
+
+ int bytesWritten = 0;
+ while (bytesWritten < size) {
+
+ final int diff = size - bytesWritten;
+ if (diff < testBuffer.length) {
+ outputStream.write(testBuffer, 0, diff);
+ bytesWritten += diff;
+ } else {
+ outputStream.write(testBuffer);
+ bytesWritten += testBuffer.length;
+ }
+ }
+ }
+
+ /**
+ * Generates a random name.
+ *
+ * @return a random name
+ */
+ private String getRandomName() {
+
+ final StringBuilder stringBuilder = new StringBuilder();
+ for (int i = 0; i < NAME_LENGTH; ++i) {
+ final char c = ALPHABET[(int) (Math.random() * (double) ALPHABET.length)];
+ stringBuilder.append(c);
+ }
+
+ return stringBuilder.toString();
+ }
+
+ /**
+ * Checks whether the AWS access key and the AWS secret keys have been successfully loaded from the configuration
+ * and whether the S3 tests shall be performed.
+ *
+ * @return <code>true</code> if the tests shall be performed, <code>false</code> if the tests shall be skipped
+ * because at least one AWS key is missing
+ */
+ private boolean testActivated() {
+
+ final String accessKey = GlobalConfiguration.getString(S3FileSystem.S3_ACCESS_KEY_KEY, null);
+ final String secretKey = GlobalConfiguration.getString(S3FileSystem.S3_SECRET_KEY_KEY, null);
+
+ if (accessKey != null && secretKey != null) {
+ return true;
+ }
+
+ return false;
+ }
+}