You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by rm...@apache.org on 2014/06/09 20:30:43 UTC

[08/30] Offer buffer-oriented API for I/O (#25)

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/2db78a8d/stratosphere-tests/src/test/java/eu/stratosphere/test/broadcastvars/KMeansIterativeNepheleITCase.java
----------------------------------------------------------------------
diff --git a/stratosphere-tests/src/test/java/eu/stratosphere/test/broadcastvars/KMeansIterativeNepheleITCase.java b/stratosphere-tests/src/test/java/eu/stratosphere/test/broadcastvars/KMeansIterativeNepheleITCase.java
index 56c2a8e..c9323a6 100644
--- a/stratosphere-tests/src/test/java/eu/stratosphere/test/broadcastvars/KMeansIterativeNepheleITCase.java
+++ b/stratosphere-tests/src/test/java/eu/stratosphere/test/broadcastvars/KMeansIterativeNepheleITCase.java
@@ -42,6 +42,7 @@ import eu.stratosphere.pact.runtime.task.GroupReduceDriver;
 import eu.stratosphere.pact.runtime.task.chaining.ChainedCollectorMapDriver;
 import eu.stratosphere.pact.runtime.task.util.LocalStrategy;
 import eu.stratosphere.pact.runtime.task.util.TaskConfig;
+import eu.stratosphere.runtime.io.channels.ChannelType;
 import eu.stratosphere.test.iterative.nephele.JobGraphUtils;
 import eu.stratosphere.test.recordJobs.kmeans.KMeansBroadcast.PointBuilder;
 import eu.stratosphere.test.recordJobs.kmeans.KMeansBroadcast.RecomputeClusterCenter;
@@ -303,8 +304,6 @@ public class KMeansIterativeNepheleITCase extends RecordAPITestBase {
 		JobGraphUtils.connect(head, output, ChannelType.NETWORK, DistributionPattern.POINTWISE);
 		
 		JobGraphUtils.connect(head, sync, ChannelType.NETWORK, DistributionPattern.BIPARTITE);
-		
-		
 
 		// -- instance sharing -------------------------------------------------------------------------------------
 		points.setVertexToShareInstancesWith(output);

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/2db78a8d/stratosphere-tests/src/test/java/eu/stratosphere/test/iterative/nephele/JobGraphUtils.java
----------------------------------------------------------------------
diff --git a/stratosphere-tests/src/test/java/eu/stratosphere/test/iterative/nephele/JobGraphUtils.java b/stratosphere-tests/src/test/java/eu/stratosphere/test/iterative/nephele/JobGraphUtils.java
index bac36bd..a229086 100644
--- a/stratosphere-tests/src/test/java/eu/stratosphere/test/iterative/nephele/JobGraphUtils.java
+++ b/stratosphere-tests/src/test/java/eu/stratosphere/test/iterative/nephele/JobGraphUtils.java
@@ -22,8 +22,8 @@ import eu.stratosphere.api.common.io.InputFormat;
 import eu.stratosphere.configuration.Configuration;
 import eu.stratosphere.nephele.client.JobClient;
 import eu.stratosphere.nephele.client.JobExecutionException;
-import eu.stratosphere.nephele.io.DistributionPattern;
-import eu.stratosphere.nephele.io.channels.ChannelType;
+import eu.stratosphere.nephele.jobgraph.DistributionPattern;
+import eu.stratosphere.runtime.io.channels.ChannelType;
 import eu.stratosphere.nephele.jobgraph.AbstractJobVertex;
 import eu.stratosphere.nephele.jobgraph.JobGraph;
 import eu.stratosphere.nephele.jobgraph.JobGraphDefinitionException;

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/2db78a8d/stratosphere-tests/src/test/java/eu/stratosphere/test/iterative/nephele/customdanglingpagerank/CustomCompensatableDanglingPageRank.java
----------------------------------------------------------------------
diff --git a/stratosphere-tests/src/test/java/eu/stratosphere/test/iterative/nephele/customdanglingpagerank/CustomCompensatableDanglingPageRank.java b/stratosphere-tests/src/test/java/eu/stratosphere/test/iterative/nephele/customdanglingpagerank/CustomCompensatableDanglingPageRank.java
index 906ccc0..06badd9 100644
--- a/stratosphere-tests/src/test/java/eu/stratosphere/test/iterative/nephele/customdanglingpagerank/CustomCompensatableDanglingPageRank.java
+++ b/stratosphere-tests/src/test/java/eu/stratosphere/test/iterative/nephele/customdanglingpagerank/CustomCompensatableDanglingPageRank.java
@@ -20,8 +20,8 @@ import eu.stratosphere.api.common.typeutils.TypeSerializerFactory;
 import eu.stratosphere.api.java.record.io.FileOutputFormat;
 import eu.stratosphere.configuration.Configuration;
 import eu.stratosphere.core.fs.Path;
-import eu.stratosphere.nephele.io.DistributionPattern;
-import eu.stratosphere.nephele.io.channels.ChannelType;
+import eu.stratosphere.nephele.jobgraph.DistributionPattern;
+import eu.stratosphere.runtime.io.channels.ChannelType;
 import eu.stratosphere.nephele.jobgraph.JobGraph;
 import eu.stratosphere.nephele.jobgraph.JobInputVertex;
 import eu.stratosphere.nephele.jobgraph.JobOutputVertex;
@@ -285,7 +285,7 @@ public class CustomCompensatableDanglingPageRank {
 
 		JobGraphUtils.connect(pageWithRankInput, head, ChannelType.NETWORK, DistributionPattern.BIPARTITE);
 
-		JobGraphUtils.connect(head, intermediate, ChannelType.INMEMORY, DistributionPattern.POINTWISE);
+		JobGraphUtils.connect(head, intermediate, ChannelType.IN_MEMORY, DistributionPattern.POINTWISE);
 		intermediateConfig.setGateIterativeWithNumberOfEventsUntilInterrupt(0, 1);
 		
 		JobGraphUtils.connect(adjacencyListInput, intermediate, ChannelType.NETWORK, DistributionPattern.BIPARTITE);
@@ -295,8 +295,8 @@ public class CustomCompensatableDanglingPageRank {
 		tailConfig.setGateIterativeWithNumberOfEventsUntilInterrupt(0, 1);
 		tailConfig.setGateIterativeWithNumberOfEventsUntilInterrupt(1, degreeOfParallelism);
 
-		JobGraphUtils.connect(head, output, ChannelType.INMEMORY, DistributionPattern.POINTWISE);
-		JobGraphUtils.connect(tail, fakeTailOutput, ChannelType.INMEMORY, DistributionPattern.POINTWISE);
+		JobGraphUtils.connect(head, output, ChannelType.IN_MEMORY, DistributionPattern.POINTWISE);
+		JobGraphUtils.connect(tail, fakeTailOutput, ChannelType.IN_MEMORY, DistributionPattern.POINTWISE);
 
 		JobGraphUtils.connect(head, sync, ChannelType.NETWORK, DistributionPattern.POINTWISE);
 		

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/2db78a8d/stratosphere-tests/src/test/java/eu/stratosphere/test/iterative/nephele/customdanglingpagerank/CustomCompensatableDanglingPageRankWithCombiner.java
----------------------------------------------------------------------
diff --git a/stratosphere-tests/src/test/java/eu/stratosphere/test/iterative/nephele/customdanglingpagerank/CustomCompensatableDanglingPageRankWithCombiner.java b/stratosphere-tests/src/test/java/eu/stratosphere/test/iterative/nephele/customdanglingpagerank/CustomCompensatableDanglingPageRankWithCombiner.java
index 6edcef3..3e51808 100644
--- a/stratosphere-tests/src/test/java/eu/stratosphere/test/iterative/nephele/customdanglingpagerank/CustomCompensatableDanglingPageRankWithCombiner.java
+++ b/stratosphere-tests/src/test/java/eu/stratosphere/test/iterative/nephele/customdanglingpagerank/CustomCompensatableDanglingPageRankWithCombiner.java
@@ -20,8 +20,8 @@ import eu.stratosphere.api.common.typeutils.TypeSerializerFactory;
 import eu.stratosphere.api.java.record.io.FileOutputFormat;
 import eu.stratosphere.configuration.Configuration;
 import eu.stratosphere.core.fs.Path;
-import eu.stratosphere.nephele.io.DistributionPattern;
-import eu.stratosphere.nephele.io.channels.ChannelType;
+import eu.stratosphere.nephele.jobgraph.DistributionPattern;
+import eu.stratosphere.runtime.io.channels.ChannelType;
 import eu.stratosphere.nephele.jobgraph.JobGraph;
 import eu.stratosphere.nephele.jobgraph.JobInputVertex;
 import eu.stratosphere.nephele.jobgraph.JobOutputVertex;
@@ -297,7 +297,7 @@ public class CustomCompensatableDanglingPageRankWithCombiner {
 
 		JobGraphUtils.connect(pageWithRankInput, head, ChannelType.NETWORK, DistributionPattern.BIPARTITE);
 
-		JobGraphUtils.connect(head, intermediate, ChannelType.INMEMORY, DistributionPattern.POINTWISE);
+		JobGraphUtils.connect(head, intermediate, ChannelType.IN_MEMORY, DistributionPattern.POINTWISE);
 		intermediateConfig.setGateIterativeWithNumberOfEventsUntilInterrupt(0, 1);
 		
 		JobGraphUtils.connect(adjacencyListInput, intermediate, ChannelType.NETWORK, DistributionPattern.BIPARTITE);
@@ -307,8 +307,8 @@ public class CustomCompensatableDanglingPageRankWithCombiner {
 		tailConfig.setGateIterativeWithNumberOfEventsUntilInterrupt(0, 1);
 		tailConfig.setGateIterativeWithNumberOfEventsUntilInterrupt(1, degreeOfParallelism);
 
-		JobGraphUtils.connect(head, output, ChannelType.INMEMORY, DistributionPattern.POINTWISE);
-		JobGraphUtils.connect(tail, fakeTailOutput, ChannelType.INMEMORY, DistributionPattern.POINTWISE);
+		JobGraphUtils.connect(head, output, ChannelType.IN_MEMORY, DistributionPattern.POINTWISE);
+		JobGraphUtils.connect(tail, fakeTailOutput, ChannelType.IN_MEMORY, DistributionPattern.POINTWISE);
 
 		JobGraphUtils.connect(head, sync, ChannelType.NETWORK, DistributionPattern.POINTWISE);
 		

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/2db78a8d/stratosphere-tests/src/test/java/eu/stratosphere/test/iterative/nephele/danglingpagerank/CompensatableDanglingPageRank.java
----------------------------------------------------------------------
diff --git a/stratosphere-tests/src/test/java/eu/stratosphere/test/iterative/nephele/danglingpagerank/CompensatableDanglingPageRank.java b/stratosphere-tests/src/test/java/eu/stratosphere/test/iterative/nephele/danglingpagerank/CompensatableDanglingPageRank.java
index ccf529a..b50c33e 100644
--- a/stratosphere-tests/src/test/java/eu/stratosphere/test/iterative/nephele/danglingpagerank/CompensatableDanglingPageRank.java
+++ b/stratosphere-tests/src/test/java/eu/stratosphere/test/iterative/nephele/danglingpagerank/CompensatableDanglingPageRank.java
@@ -19,8 +19,8 @@ import eu.stratosphere.api.common.typeutils.TypePairComparatorFactory;
 import eu.stratosphere.api.common.typeutils.TypeSerializerFactory;
 import eu.stratosphere.api.java.record.io.FileOutputFormat;
 import eu.stratosphere.configuration.Configuration;
-import eu.stratosphere.nephele.io.DistributionPattern;
-import eu.stratosphere.nephele.io.channels.ChannelType;
+import eu.stratosphere.nephele.jobgraph.DistributionPattern;
+import eu.stratosphere.runtime.io.channels.ChannelType;
 import eu.stratosphere.nephele.jobgraph.JobGraph;
 import eu.stratosphere.nephele.jobgraph.JobInputVertex;
 import eu.stratosphere.nephele.jobgraph.JobOutputVertex;
@@ -265,7 +265,7 @@ public class CompensatableDanglingPageRank {
 
 		JobGraphUtils.connect(pageWithRankInput, head, ChannelType.NETWORK, DistributionPattern.BIPARTITE);
 
-		JobGraphUtils.connect(head, intermediate, ChannelType.INMEMORY, DistributionPattern.POINTWISE);
+		JobGraphUtils.connect(head, intermediate, ChannelType.IN_MEMORY, DistributionPattern.POINTWISE);
 		intermediateConfig.setGateIterativeWithNumberOfEventsUntilInterrupt(0, 1);
 		
 		JobGraphUtils.connect(adjacencyListInput, intermediate, ChannelType.NETWORK, DistributionPattern.BIPARTITE);
@@ -275,8 +275,8 @@ public class CompensatableDanglingPageRank {
 		tailConfig.setGateIterativeWithNumberOfEventsUntilInterrupt(0, 1);
 		tailConfig.setGateIterativeWithNumberOfEventsUntilInterrupt(1, degreeOfParallelism);
 
-		JobGraphUtils.connect(head, output, ChannelType.INMEMORY, DistributionPattern.POINTWISE);
-		JobGraphUtils.connect(tail, fakeTailOutput, ChannelType.INMEMORY, DistributionPattern.POINTWISE);
+		JobGraphUtils.connect(head, output, ChannelType.IN_MEMORY, DistributionPattern.POINTWISE);
+		JobGraphUtils.connect(tail, fakeTailOutput, ChannelType.IN_MEMORY, DistributionPattern.POINTWISE);
 
 		JobGraphUtils.connect(head, sync, ChannelType.NETWORK, DistributionPattern.POINTWISE);
 		

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/2db78a8d/stratosphere-tests/src/test/java/eu/stratosphere/test/recordJobs/relational/query1Util/LineItemFilterTest.java
----------------------------------------------------------------------
diff --git a/stratosphere-tests/src/test/java/eu/stratosphere/test/recordJobs/relational/query1Util/LineItemFilterTest.java b/stratosphere-tests/src/test/java/eu/stratosphere/test/recordJobs/relational/query1Util/LineItemFilterTest.java
index 03e442e..04767c6 100644
--- a/stratosphere-tests/src/test/java/eu/stratosphere/test/recordJobs/relational/query1Util/LineItemFilterTest.java
+++ b/stratosphere-tests/src/test/java/eu/stratosphere/test/recordJobs/relational/query1Util/LineItemFilterTest.java
@@ -21,12 +21,12 @@ import static org.mockito.MockitoAnnotations.initMocks;
 import java.util.ArrayList;
 import java.util.List;
 
+import eu.stratosphere.runtime.io.api.RecordWriter;
 import org.junit.Before;
 import org.junit.Test;
 import org.mockito.ArgumentCaptor;
 import org.mockito.Mock;
 
-import eu.stratosphere.nephele.io.AbstractRecordWriter;
 import eu.stratosphere.pact.runtime.shipping.RecordOutputCollector;
 import eu.stratosphere.test.recordJobs.util.Tuple;
 import eu.stratosphere.types.IntValue;
@@ -39,9 +39,9 @@ public class LineItemFilterTest {
 	private static final String RETURN_FLAG = "N";
 	
 	@Mock
-	AbstractRecordWriter<Record> recordWriterMock; 
+	RecordWriter<Record> recordWriterMock;
 	
-	private List<AbstractRecordWriter<Record>> writerList = new ArrayList<AbstractRecordWriter<Record>>();
+	private List<RecordWriter<Record>> writerList = new ArrayList<RecordWriter<Record>>();
 	
 	@Before
 	public void setUp()