You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by rm...@apache.org on 2014/06/10 21:35:05 UTC
[08/34] Offer buffer-oriented API for I/O (#25)
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/352c1b99/stratosphere-tests/src/test/java/eu/stratosphere/test/broadcastvars/KMeansIterativeNepheleITCase.java
----------------------------------------------------------------------
diff --git a/stratosphere-tests/src/test/java/eu/stratosphere/test/broadcastvars/KMeansIterativeNepheleITCase.java b/stratosphere-tests/src/test/java/eu/stratosphere/test/broadcastvars/KMeansIterativeNepheleITCase.java
index 56c2a8e..c9323a6 100644
--- a/stratosphere-tests/src/test/java/eu/stratosphere/test/broadcastvars/KMeansIterativeNepheleITCase.java
+++ b/stratosphere-tests/src/test/java/eu/stratosphere/test/broadcastvars/KMeansIterativeNepheleITCase.java
@@ -42,6 +42,7 @@ import eu.stratosphere.pact.runtime.task.GroupReduceDriver;
import eu.stratosphere.pact.runtime.task.chaining.ChainedCollectorMapDriver;
import eu.stratosphere.pact.runtime.task.util.LocalStrategy;
import eu.stratosphere.pact.runtime.task.util.TaskConfig;
+import eu.stratosphere.runtime.io.channels.ChannelType;
import eu.stratosphere.test.iterative.nephele.JobGraphUtils;
import eu.stratosphere.test.recordJobs.kmeans.KMeansBroadcast.PointBuilder;
import eu.stratosphere.test.recordJobs.kmeans.KMeansBroadcast.RecomputeClusterCenter;
@@ -303,8 +304,6 @@ public class KMeansIterativeNepheleITCase extends RecordAPITestBase {
JobGraphUtils.connect(head, output, ChannelType.NETWORK, DistributionPattern.POINTWISE);
JobGraphUtils.connect(head, sync, ChannelType.NETWORK, DistributionPattern.BIPARTITE);
-
-
// -- instance sharing -------------------------------------------------------------------------------------
points.setVertexToShareInstancesWith(output);
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/352c1b99/stratosphere-tests/src/test/java/eu/stratosphere/test/iterative/nephele/JobGraphUtils.java
----------------------------------------------------------------------
diff --git a/stratosphere-tests/src/test/java/eu/stratosphere/test/iterative/nephele/JobGraphUtils.java b/stratosphere-tests/src/test/java/eu/stratosphere/test/iterative/nephele/JobGraphUtils.java
index bac36bd..a229086 100644
--- a/stratosphere-tests/src/test/java/eu/stratosphere/test/iterative/nephele/JobGraphUtils.java
+++ b/stratosphere-tests/src/test/java/eu/stratosphere/test/iterative/nephele/JobGraphUtils.java
@@ -22,8 +22,8 @@ import eu.stratosphere.api.common.io.InputFormat;
import eu.stratosphere.configuration.Configuration;
import eu.stratosphere.nephele.client.JobClient;
import eu.stratosphere.nephele.client.JobExecutionException;
-import eu.stratosphere.nephele.io.DistributionPattern;
-import eu.stratosphere.nephele.io.channels.ChannelType;
+import eu.stratosphere.nephele.jobgraph.DistributionPattern;
+import eu.stratosphere.runtime.io.channels.ChannelType;
import eu.stratosphere.nephele.jobgraph.AbstractJobVertex;
import eu.stratosphere.nephele.jobgraph.JobGraph;
import eu.stratosphere.nephele.jobgraph.JobGraphDefinitionException;
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/352c1b99/stratosphere-tests/src/test/java/eu/stratosphere/test/iterative/nephele/customdanglingpagerank/CustomCompensatableDanglingPageRank.java
----------------------------------------------------------------------
diff --git a/stratosphere-tests/src/test/java/eu/stratosphere/test/iterative/nephele/customdanglingpagerank/CustomCompensatableDanglingPageRank.java b/stratosphere-tests/src/test/java/eu/stratosphere/test/iterative/nephele/customdanglingpagerank/CustomCompensatableDanglingPageRank.java
index 906ccc0..06badd9 100644
--- a/stratosphere-tests/src/test/java/eu/stratosphere/test/iterative/nephele/customdanglingpagerank/CustomCompensatableDanglingPageRank.java
+++ b/stratosphere-tests/src/test/java/eu/stratosphere/test/iterative/nephele/customdanglingpagerank/CustomCompensatableDanglingPageRank.java
@@ -20,8 +20,8 @@ import eu.stratosphere.api.common.typeutils.TypeSerializerFactory;
import eu.stratosphere.api.java.record.io.FileOutputFormat;
import eu.stratosphere.configuration.Configuration;
import eu.stratosphere.core.fs.Path;
-import eu.stratosphere.nephele.io.DistributionPattern;
-import eu.stratosphere.nephele.io.channels.ChannelType;
+import eu.stratosphere.nephele.jobgraph.DistributionPattern;
+import eu.stratosphere.runtime.io.channels.ChannelType;
import eu.stratosphere.nephele.jobgraph.JobGraph;
import eu.stratosphere.nephele.jobgraph.JobInputVertex;
import eu.stratosphere.nephele.jobgraph.JobOutputVertex;
@@ -285,7 +285,7 @@ public class CustomCompensatableDanglingPageRank {
JobGraphUtils.connect(pageWithRankInput, head, ChannelType.NETWORK, DistributionPattern.BIPARTITE);
- JobGraphUtils.connect(head, intermediate, ChannelType.INMEMORY, DistributionPattern.POINTWISE);
+ JobGraphUtils.connect(head, intermediate, ChannelType.IN_MEMORY, DistributionPattern.POINTWISE);
intermediateConfig.setGateIterativeWithNumberOfEventsUntilInterrupt(0, 1);
JobGraphUtils.connect(adjacencyListInput, intermediate, ChannelType.NETWORK, DistributionPattern.BIPARTITE);
@@ -295,8 +295,8 @@ public class CustomCompensatableDanglingPageRank {
tailConfig.setGateIterativeWithNumberOfEventsUntilInterrupt(0, 1);
tailConfig.setGateIterativeWithNumberOfEventsUntilInterrupt(1, degreeOfParallelism);
- JobGraphUtils.connect(head, output, ChannelType.INMEMORY, DistributionPattern.POINTWISE);
- JobGraphUtils.connect(tail, fakeTailOutput, ChannelType.INMEMORY, DistributionPattern.POINTWISE);
+ JobGraphUtils.connect(head, output, ChannelType.IN_MEMORY, DistributionPattern.POINTWISE);
+ JobGraphUtils.connect(tail, fakeTailOutput, ChannelType.IN_MEMORY, DistributionPattern.POINTWISE);
JobGraphUtils.connect(head, sync, ChannelType.NETWORK, DistributionPattern.POINTWISE);
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/352c1b99/stratosphere-tests/src/test/java/eu/stratosphere/test/iterative/nephele/customdanglingpagerank/CustomCompensatableDanglingPageRankWithCombiner.java
----------------------------------------------------------------------
diff --git a/stratosphere-tests/src/test/java/eu/stratosphere/test/iterative/nephele/customdanglingpagerank/CustomCompensatableDanglingPageRankWithCombiner.java b/stratosphere-tests/src/test/java/eu/stratosphere/test/iterative/nephele/customdanglingpagerank/CustomCompensatableDanglingPageRankWithCombiner.java
index 6edcef3..3e51808 100644
--- a/stratosphere-tests/src/test/java/eu/stratosphere/test/iterative/nephele/customdanglingpagerank/CustomCompensatableDanglingPageRankWithCombiner.java
+++ b/stratosphere-tests/src/test/java/eu/stratosphere/test/iterative/nephele/customdanglingpagerank/CustomCompensatableDanglingPageRankWithCombiner.java
@@ -20,8 +20,8 @@ import eu.stratosphere.api.common.typeutils.TypeSerializerFactory;
import eu.stratosphere.api.java.record.io.FileOutputFormat;
import eu.stratosphere.configuration.Configuration;
import eu.stratosphere.core.fs.Path;
-import eu.stratosphere.nephele.io.DistributionPattern;
-import eu.stratosphere.nephele.io.channels.ChannelType;
+import eu.stratosphere.nephele.jobgraph.DistributionPattern;
+import eu.stratosphere.runtime.io.channels.ChannelType;
import eu.stratosphere.nephele.jobgraph.JobGraph;
import eu.stratosphere.nephele.jobgraph.JobInputVertex;
import eu.stratosphere.nephele.jobgraph.JobOutputVertex;
@@ -297,7 +297,7 @@ public class CustomCompensatableDanglingPageRankWithCombiner {
JobGraphUtils.connect(pageWithRankInput, head, ChannelType.NETWORK, DistributionPattern.BIPARTITE);
- JobGraphUtils.connect(head, intermediate, ChannelType.INMEMORY, DistributionPattern.POINTWISE);
+ JobGraphUtils.connect(head, intermediate, ChannelType.IN_MEMORY, DistributionPattern.POINTWISE);
intermediateConfig.setGateIterativeWithNumberOfEventsUntilInterrupt(0, 1);
JobGraphUtils.connect(adjacencyListInput, intermediate, ChannelType.NETWORK, DistributionPattern.BIPARTITE);
@@ -307,8 +307,8 @@ public class CustomCompensatableDanglingPageRankWithCombiner {
tailConfig.setGateIterativeWithNumberOfEventsUntilInterrupt(0, 1);
tailConfig.setGateIterativeWithNumberOfEventsUntilInterrupt(1, degreeOfParallelism);
- JobGraphUtils.connect(head, output, ChannelType.INMEMORY, DistributionPattern.POINTWISE);
- JobGraphUtils.connect(tail, fakeTailOutput, ChannelType.INMEMORY, DistributionPattern.POINTWISE);
+ JobGraphUtils.connect(head, output, ChannelType.IN_MEMORY, DistributionPattern.POINTWISE);
+ JobGraphUtils.connect(tail, fakeTailOutput, ChannelType.IN_MEMORY, DistributionPattern.POINTWISE);
JobGraphUtils.connect(head, sync, ChannelType.NETWORK, DistributionPattern.POINTWISE);
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/352c1b99/stratosphere-tests/src/test/java/eu/stratosphere/test/iterative/nephele/danglingpagerank/CompensatableDanglingPageRank.java
----------------------------------------------------------------------
diff --git a/stratosphere-tests/src/test/java/eu/stratosphere/test/iterative/nephele/danglingpagerank/CompensatableDanglingPageRank.java b/stratosphere-tests/src/test/java/eu/stratosphere/test/iterative/nephele/danglingpagerank/CompensatableDanglingPageRank.java
index ccf529a..b50c33e 100644
--- a/stratosphere-tests/src/test/java/eu/stratosphere/test/iterative/nephele/danglingpagerank/CompensatableDanglingPageRank.java
+++ b/stratosphere-tests/src/test/java/eu/stratosphere/test/iterative/nephele/danglingpagerank/CompensatableDanglingPageRank.java
@@ -19,8 +19,8 @@ import eu.stratosphere.api.common.typeutils.TypePairComparatorFactory;
import eu.stratosphere.api.common.typeutils.TypeSerializerFactory;
import eu.stratosphere.api.java.record.io.FileOutputFormat;
import eu.stratosphere.configuration.Configuration;
-import eu.stratosphere.nephele.io.DistributionPattern;
-import eu.stratosphere.nephele.io.channels.ChannelType;
+import eu.stratosphere.nephele.jobgraph.DistributionPattern;
+import eu.stratosphere.runtime.io.channels.ChannelType;
import eu.stratosphere.nephele.jobgraph.JobGraph;
import eu.stratosphere.nephele.jobgraph.JobInputVertex;
import eu.stratosphere.nephele.jobgraph.JobOutputVertex;
@@ -265,7 +265,7 @@ public class CompensatableDanglingPageRank {
JobGraphUtils.connect(pageWithRankInput, head, ChannelType.NETWORK, DistributionPattern.BIPARTITE);
- JobGraphUtils.connect(head, intermediate, ChannelType.INMEMORY, DistributionPattern.POINTWISE);
+ JobGraphUtils.connect(head, intermediate, ChannelType.IN_MEMORY, DistributionPattern.POINTWISE);
intermediateConfig.setGateIterativeWithNumberOfEventsUntilInterrupt(0, 1);
JobGraphUtils.connect(adjacencyListInput, intermediate, ChannelType.NETWORK, DistributionPattern.BIPARTITE);
@@ -275,8 +275,8 @@ public class CompensatableDanglingPageRank {
tailConfig.setGateIterativeWithNumberOfEventsUntilInterrupt(0, 1);
tailConfig.setGateIterativeWithNumberOfEventsUntilInterrupt(1, degreeOfParallelism);
- JobGraphUtils.connect(head, output, ChannelType.INMEMORY, DistributionPattern.POINTWISE);
- JobGraphUtils.connect(tail, fakeTailOutput, ChannelType.INMEMORY, DistributionPattern.POINTWISE);
+ JobGraphUtils.connect(head, output, ChannelType.IN_MEMORY, DistributionPattern.POINTWISE);
+ JobGraphUtils.connect(tail, fakeTailOutput, ChannelType.IN_MEMORY, DistributionPattern.POINTWISE);
JobGraphUtils.connect(head, sync, ChannelType.NETWORK, DistributionPattern.POINTWISE);
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/352c1b99/stratosphere-tests/src/test/java/eu/stratosphere/test/recordJobs/relational/query1Util/LineItemFilterTest.java
----------------------------------------------------------------------
diff --git a/stratosphere-tests/src/test/java/eu/stratosphere/test/recordJobs/relational/query1Util/LineItemFilterTest.java b/stratosphere-tests/src/test/java/eu/stratosphere/test/recordJobs/relational/query1Util/LineItemFilterTest.java
index 03e442e..04767c6 100644
--- a/stratosphere-tests/src/test/java/eu/stratosphere/test/recordJobs/relational/query1Util/LineItemFilterTest.java
+++ b/stratosphere-tests/src/test/java/eu/stratosphere/test/recordJobs/relational/query1Util/LineItemFilterTest.java
@@ -21,12 +21,12 @@ import static org.mockito.MockitoAnnotations.initMocks;
import java.util.ArrayList;
import java.util.List;
+import eu.stratosphere.runtime.io.api.RecordWriter;
import org.junit.Before;
import org.junit.Test;
import org.mockito.ArgumentCaptor;
import org.mockito.Mock;
-import eu.stratosphere.nephele.io.AbstractRecordWriter;
import eu.stratosphere.pact.runtime.shipping.RecordOutputCollector;
import eu.stratosphere.test.recordJobs.util.Tuple;
import eu.stratosphere.types.IntValue;
@@ -39,9 +39,9 @@ public class LineItemFilterTest {
private static final String RETURN_FLAG = "N";
@Mock
- AbstractRecordWriter<Record> recordWriterMock;
+ RecordWriter<Record> recordWriterMock;
- private List<AbstractRecordWriter<Record>> writerList = new ArrayList<AbstractRecordWriter<Record>>();
+ private List<RecordWriter<Record>> writerList = new ArrayList<RecordWriter<Record>>();
@Before
public void setUp()