You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by se...@apache.org on 2015/08/30 23:44:39 UTC

[4/5] flink git commit: [FLINK-1681] [tests] Remove outdated 'nephele' iteration tests.

http://git-wip-us.apache.org/repos/asf/flink/blob/a7a57ebe/flink-tests/src/test/java/org/apache/flink/test/iterative/nephele/customdanglingpagerank/CustomCompensatableDanglingPageRank.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/iterative/nephele/customdanglingpagerank/CustomCompensatableDanglingPageRank.java b/flink-tests/src/test/java/org/apache/flink/test/iterative/nephele/customdanglingpagerank/CustomCompensatableDanglingPageRank.java
deleted file mode 100644
index a6e6b6e..0000000
--- a/flink-tests/src/test/java/org/apache/flink/test/iterative/nephele/customdanglingpagerank/CustomCompensatableDanglingPageRank.java
+++ /dev/null
@@ -1,315 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.test.iterative.nephele.customdanglingpagerank;
-
-import org.apache.flink.api.common.operators.util.UserCodeClassWrapper;
-import org.apache.flink.api.common.typeutils.TypeComparatorFactory;
-import org.apache.flink.api.common.typeutils.TypePairComparatorFactory;
-import org.apache.flink.api.common.typeutils.TypeSerializerFactory;
-import org.apache.flink.api.java.record.io.FileOutputFormat;
-import org.apache.flink.configuration.Configuration;
-import org.apache.flink.runtime.iterative.task.IterationHeadPactTask;
-import org.apache.flink.runtime.iterative.task.IterationIntermediatePactTask;
-import org.apache.flink.runtime.iterative.task.IterationTailPactTask;
-import org.apache.flink.runtime.jobgraph.JobVertex;
-import org.apache.flink.runtime.jobgraph.DistributionPattern;
-import org.apache.flink.runtime.jobgraph.InputFormatVertex;
-import org.apache.flink.runtime.jobgraph.JobGraph;
-import org.apache.flink.runtime.jobgraph.OutputFormatVertex;
-import org.apache.flink.runtime.jobmanager.scheduler.SlotSharingGroup;
-import org.apache.flink.runtime.operators.BuildSecondCachedJoinDriver;
-import org.apache.flink.runtime.operators.CoGroupDriver;
-import org.apache.flink.runtime.operators.CollectorMapDriver;
-import org.apache.flink.runtime.operators.DriverStrategy;
-import org.apache.flink.runtime.operators.shipping.ShipStrategyType;
-import org.apache.flink.runtime.operators.util.LocalStrategy;
-import org.apache.flink.runtime.operators.util.TaskConfig;
-import org.apache.flink.test.iterative.nephele.JobGraphUtils;
-import org.apache.flink.test.iterative.nephele.customdanglingpagerank.types.VertexWithAdjacencyList;
-import org.apache.flink.test.iterative.nephele.customdanglingpagerank.types.VertexWithAdjacencyListComparatorFactory;
-import org.apache.flink.test.iterative.nephele.customdanglingpagerank.types.VertexWithAdjacencyListSerializerFactory;
-import org.apache.flink.test.iterative.nephele.customdanglingpagerank.types.VertexWithRank;
-import org.apache.flink.test.iterative.nephele.customdanglingpagerank.types.VertexWithRankAndDangling;
-import org.apache.flink.test.iterative.nephele.customdanglingpagerank.types.VertexWithRankAndDanglingComparatorFactory;
-import org.apache.flink.test.iterative.nephele.customdanglingpagerank.types.VertexWithRankAndDanglingSerializerFactory;
-import org.apache.flink.test.iterative.nephele.customdanglingpagerank.types.VertexWithRankComparatorFactory;
-import org.apache.flink.test.iterative.nephele.customdanglingpagerank.types.VertexWithRankDanglingToVertexWithAdjacencyListPairComparatorFactory;
-import org.apache.flink.test.iterative.nephele.customdanglingpagerank.types.VertexWithRankDanglingToVertexWithRankPairComparatorFactory;
-import org.apache.flink.test.iterative.nephele.customdanglingpagerank.types.VertexWithRankSerializerFactory;
-import org.apache.flink.test.iterative.nephele.danglingpagerank.DiffL1NormConvergenceCriterion;
-import org.apache.flink.test.iterative.nephele.danglingpagerank.PageRankStatsAggregator;
-import org.apache.flink.test.util.TestBaseUtils;
-
-public class CustomCompensatableDanglingPageRank {
-	
-	private static final int NUM_FILE_HANDLES_PER_SORT = 64;
-	
-	private static final float SORT_SPILL_THRESHOLD = 0.85f;
-	
-	private static final int ITERATION_ID = 1;
-	
-	
-	private static TypeSerializerFactory<VertexWithRank> vertexWithRankSerializer = new VertexWithRankSerializerFactory();
-	
-	private static TypeSerializerFactory<VertexWithRankAndDangling> vertexWithRankAndDanglingSerializer = new VertexWithRankAndDanglingSerializerFactory();
-	
-	private static TypeSerializerFactory<VertexWithAdjacencyList> vertexWithAdjacencyListSerializer = new VertexWithAdjacencyListSerializerFactory();
-	
-	private static TypeComparatorFactory<VertexWithRank> vertexWithRankComparator = new VertexWithRankComparatorFactory();
-	
-	private static TypeComparatorFactory<VertexWithRankAndDangling> vertexWithRankAndDanglingComparator = new VertexWithRankAndDanglingComparatorFactory();
-	
-	private static TypeComparatorFactory<VertexWithAdjacencyList> vertexWithAdjacencyListComparator = new VertexWithAdjacencyListComparatorFactory();
-	
-	private static TypePairComparatorFactory<VertexWithRankAndDangling, VertexWithAdjacencyList> matchComparator =
-			new VertexWithRankDanglingToVertexWithAdjacencyListPairComparatorFactory();
-	
-	private static TypePairComparatorFactory<VertexWithRankAndDangling, VertexWithRank> coGroupComparator =
-			new VertexWithRankDanglingToVertexWithRankPairComparatorFactory();
-	
-
-//	public static void main(String[] args) throws Exception {
-//		String confPath = args.length >= 6 ? confPath = args[5] : PlayConstants.PLAY_DIR + "local-conf";
-//		
-//		GlobalConfiguration.loadConfiguration(confPath);
-//		Configuration conf = GlobalConfiguration.getConfiguration();
-//		
-//		JobGraph jobGraph = getJobGraph(args);
-//		JobGraphUtils.submit(jobGraph, conf);
-//	}
-		
-	public static JobGraph getJobGraph(String[] args) throws Exception {
-
-		int parallelism = 2;
-		String pageWithRankInputPath = ""; //"file://" + PlayConstants.PLAY_DIR + "test-inputs/danglingpagerank/pageWithRank";
-		String adjacencyListInputPath = ""; //"file://" + PlayConstants.PLAY_DIR +
-//			"test-inputs/danglingpagerank/adjacencylists";
-		String outputPath = TestBaseUtils.constructTestURI(CustomCompensatableDanglingPageRank.class, "flink_iterations");
-//		String confPath = PlayConstants.PLAY_DIR + "local-conf";
-		int minorConsumer = 2;
-		int matchMemory = 5;
-		int coGroupSortMemory = 5;
-		int numIterations = 25;
-		long numVertices = 5;
-		long numDanglingVertices = 1;
-
-		String failingWorkers = "1";
-		int failingIteration = 2;
-		double messageLoss = 0.75;
-
-		if (args.length >= 14) {
-			parallelism = Integer.parseInt(args[0]);
-			pageWithRankInputPath = args[1];
-			adjacencyListInputPath = args[2];
-			outputPath = args[3];
-//			confPath = args[4];
-			minorConsumer = Integer.parseInt(args[5]);
-			matchMemory = Integer.parseInt(args[6]);
-			coGroupSortMemory = Integer.parseInt(args[7]);
-			numIterations = Integer.parseInt(args[8]);
-			numVertices = Long.parseLong(args[9]);
-			numDanglingVertices = Long.parseLong(args[10]);
-			failingWorkers = args[11];
-			failingIteration = Integer.parseInt(args[12]);
-			messageLoss = Double.parseDouble(args[13]);
-		}
-
-		int totalMemoryConsumption = 3*minorConsumer + matchMemory + coGroupSortMemory;
-
-		JobGraph jobGraph = new JobGraph("CompensatableDanglingPageRank");
-		
-		// --------------- the inputs ---------------------
-
-		// page rank input
-		InputFormatVertex pageWithRankInput = JobGraphUtils.createInput(new CustomImprovedDanglingPageRankInputFormat(),
-			pageWithRankInputPath, "DanglingPageWithRankInput", jobGraph, parallelism);
-		TaskConfig pageWithRankInputConfig = new TaskConfig(pageWithRankInput.getConfiguration());
-		pageWithRankInputConfig.addOutputShipStrategy(ShipStrategyType.PARTITION_HASH);
-		pageWithRankInputConfig.setOutputComparator(vertexWithRankAndDanglingComparator, 0);
-		pageWithRankInputConfig.setOutputSerializer(vertexWithRankAndDanglingSerializer);
-		pageWithRankInputConfig.setStubParameter("pageRank.numVertices", String.valueOf(numVertices));
-
-		// edges as adjacency list
-		InputFormatVertex adjacencyListInput = JobGraphUtils.createInput(new CustomImprovedAdjacencyListInputFormat(),
-			adjacencyListInputPath, "AdjancencyListInput", jobGraph, parallelism);
-		TaskConfig adjacencyListInputConfig = new TaskConfig(adjacencyListInput.getConfiguration());
-		adjacencyListInputConfig.addOutputShipStrategy(ShipStrategyType.PARTITION_HASH);
-		adjacencyListInputConfig.setOutputSerializer(vertexWithAdjacencyListSerializer);
-		adjacencyListInputConfig.setOutputComparator(vertexWithAdjacencyListComparator, 0);
-
-		// --------------- the head ---------------------
-		JobVertex head = JobGraphUtils.createTask(IterationHeadPactTask.class, "IterationHead", jobGraph,
-			parallelism);
-		TaskConfig headConfig = new TaskConfig(head.getConfiguration());
-		headConfig.setIterationId(ITERATION_ID);
-		
-		// initial input / partial solution
-		headConfig.addInputToGroup(0);
-		headConfig.setIterationHeadPartialSolutionOrWorksetInputIndex(0);
-		headConfig.setInputSerializer(vertexWithRankAndDanglingSerializer, 0);
-		headConfig.setInputComparator(vertexWithRankAndDanglingComparator, 0);
-		headConfig.setInputLocalStrategy(0, LocalStrategy.SORT);
-		headConfig.setRelativeMemoryInput(0, (double) minorConsumer / totalMemoryConsumption);
-		headConfig.setFilehandlesInput(0, NUM_FILE_HANDLES_PER_SORT);
-		headConfig.setSpillingThresholdInput(0, SORT_SPILL_THRESHOLD);
-		
-		// back channel / iterations
-		headConfig.setRelativeBackChannelMemory((double) minorConsumer / totalMemoryConsumption);
-		
-		// output into iteration
-		headConfig.setOutputSerializer(vertexWithRankAndDanglingSerializer);
-		headConfig.addOutputShipStrategy(ShipStrategyType.FORWARD);
-		headConfig.addOutputShipStrategy(ShipStrategyType.FORWARD);
-		
-		// final output
-		TaskConfig headFinalOutConfig = new TaskConfig(new Configuration());
-		headFinalOutConfig.setOutputSerializer(vertexWithRankAndDanglingSerializer);
-		headFinalOutConfig.addOutputShipStrategy(ShipStrategyType.FORWARD);
-		headConfig.setIterationHeadFinalOutputConfig(headFinalOutConfig);
-		
-		// the sync
-		headConfig.setIterationHeadIndexOfSyncOutput(3);
-		headConfig.setNumberOfIterations(numIterations);
-		
-		// the driver 
-		headConfig.setDriver(CollectorMapDriver.class);
-		headConfig.setDriverStrategy(DriverStrategy.COLLECTOR_MAP);
-		headConfig.setStubWrapper(new UserCodeClassWrapper<CustomCompensatingMap>(CustomCompensatingMap.class));
-		headConfig.setStubParameter("pageRank.numVertices", String.valueOf(numVertices));
-		headConfig.setStubParameter("compensation.failingWorker", failingWorkers);
-		headConfig.setStubParameter("compensation.failingIteration", String.valueOf(failingIteration));
-		headConfig.setStubParameter("compensation.messageLoss", String.valueOf(messageLoss));
-		headConfig.addIterationAggregator(CustomCompensatableDotProductCoGroup.AGGREGATOR_NAME, new PageRankStatsAggregator());
-
-		// --------------- the join ---------------------
-		
-		JobVertex intermediate = JobGraphUtils.createTask(IterationIntermediatePactTask.class,
-			"IterationIntermediate", jobGraph, parallelism);
-		TaskConfig intermediateConfig = new TaskConfig(intermediate.getConfiguration());
-		intermediateConfig.setIterationId(ITERATION_ID);
-//		intermediateConfig.setDriver(RepeatableHashjoinMatchDriverWithCachedBuildside.class);
-		intermediateConfig.setDriver(BuildSecondCachedJoinDriver.class);
-		intermediateConfig.setDriverStrategy(DriverStrategy.HYBRIDHASH_BUILD_SECOND);
-		intermediateConfig.setRelativeMemoryDriver((double) matchMemory / totalMemoryConsumption);
-		intermediateConfig.addInputToGroup(0);
-		intermediateConfig.addInputToGroup(1);
-		intermediateConfig.setInputSerializer(vertexWithRankAndDanglingSerializer, 0);
-		intermediateConfig.setInputSerializer(vertexWithAdjacencyListSerializer, 1);
-		intermediateConfig.setDriverComparator(vertexWithRankAndDanglingComparator, 0);
-		intermediateConfig.setDriverComparator(vertexWithAdjacencyListComparator, 1);
-		intermediateConfig.setDriverPairComparator(matchComparator);
-		
-		intermediateConfig.setOutputSerializer(vertexWithRankSerializer);
-		intermediateConfig.addOutputShipStrategy(ShipStrategyType.PARTITION_HASH);
-		intermediateConfig.setOutputComparator(vertexWithRankComparator, 0);
-		
-		intermediateConfig.setStubWrapper(new UserCodeClassWrapper<CustomCompensatableDotProductMatch>(CustomCompensatableDotProductMatch.class));
-		intermediateConfig.setStubParameter("pageRank.numVertices", String.valueOf(numVertices));
-		intermediateConfig.setStubParameter("compensation.failingWorker", failingWorkers);
-		intermediateConfig.setStubParameter("compensation.failingIteration", String.valueOf(failingIteration));
-		intermediateConfig.setStubParameter("compensation.messageLoss", String.valueOf(messageLoss));
-
-		// ---------------- the tail (co group) --------------------
-		
-		JobVertex tail = JobGraphUtils.createTask(IterationTailPactTask.class, "IterationTail", jobGraph,
-			parallelism);
-		TaskConfig tailConfig = new TaskConfig(tail.getConfiguration());
-		tailConfig.setIterationId(ITERATION_ID);
-		tailConfig.setIsWorksetUpdate();
-		
-		// inputs and driver
-		tailConfig.setDriver(CoGroupDriver.class);
-		tailConfig.setDriverStrategy(DriverStrategy.CO_GROUP);
-		tailConfig.addInputToGroup(0);
-		tailConfig.addInputToGroup(1);
-		tailConfig.setInputSerializer(vertexWithRankAndDanglingSerializer, 0);
-		tailConfig.setInputSerializer(vertexWithRankSerializer, 1);
-		tailConfig.setDriverComparator(vertexWithRankAndDanglingComparator, 0);
-		tailConfig.setDriverComparator(vertexWithRankComparator, 1);
-		tailConfig.setDriverPairComparator(coGroupComparator);
-		tailConfig.setInputAsynchronouslyMaterialized(0, true);
-		tailConfig.setRelativeInputMaterializationMemory(0, (double)minorConsumer/totalMemoryConsumption);
-		tailConfig.setInputLocalStrategy(1, LocalStrategy.SORT);
-		tailConfig.setInputComparator(vertexWithRankComparator, 1);
-		tailConfig.setRelativeMemoryInput(1, (double) coGroupSortMemory / totalMemoryConsumption);
-		tailConfig.setFilehandlesInput(1, NUM_FILE_HANDLES_PER_SORT);
-		tailConfig.setSpillingThresholdInput(1, SORT_SPILL_THRESHOLD);
-		
-		// output
-		tailConfig.setOutputSerializer(vertexWithRankAndDanglingSerializer);
-		
-		// the stub
-		tailConfig.setStubWrapper(new UserCodeClassWrapper<CustomCompensatableDotProductCoGroup>(CustomCompensatableDotProductCoGroup.class));
-		tailConfig.setStubParameter("pageRank.numVertices", String.valueOf(numVertices));
-		tailConfig.setStubParameter("pageRank.numDanglingVertices", String.valueOf(numDanglingVertices));
-		tailConfig.setStubParameter("compensation.failingWorker", failingWorkers);
-		tailConfig.setStubParameter("compensation.failingIteration", String.valueOf(failingIteration));
-		tailConfig.setStubParameter("compensation.messageLoss", String.valueOf(messageLoss));
-		
-		// --------------- the output ---------------------
-
-		OutputFormatVertex output = JobGraphUtils.createFileOutput(jobGraph, "FinalOutput", parallelism);
-		TaskConfig outputConfig = new TaskConfig(output.getConfiguration());
-		outputConfig.addInputToGroup(0);
-		outputConfig.setInputSerializer(vertexWithRankAndDanglingSerializer, 0);
-		outputConfig.setStubWrapper(new UserCodeClassWrapper<CustomPageWithRankOutFormat>(CustomPageWithRankOutFormat.class));
-		outputConfig.setStubParameter(FileOutputFormat.FILE_PARAMETER_KEY, outputPath);
-		
-		// --------------- the auxiliaries ---------------------
-
-		JobVertex sync = JobGraphUtils.createSync(jobGraph, parallelism);
-		TaskConfig syncConfig = new TaskConfig(sync.getConfiguration());
-		syncConfig.setNumberOfIterations(numIterations);
-		syncConfig.addIterationAggregator(CustomCompensatableDotProductCoGroup.AGGREGATOR_NAME, new PageRankStatsAggregator());
-		syncConfig.setConvergenceCriterion(CustomCompensatableDotProductCoGroup.AGGREGATOR_NAME, new DiffL1NormConvergenceCriterion());
-		syncConfig.setIterationId(ITERATION_ID);
-		
-		// --------------- the wiring ---------------------
-
-		JobGraphUtils.connect(pageWithRankInput, head, DistributionPattern.ALL_TO_ALL);
-
-		JobGraphUtils.connect(head, intermediate, DistributionPattern.POINTWISE);
-		intermediateConfig.setGateIterativeWithNumberOfEventsUntilInterrupt(0, 1);
-		
-		JobGraphUtils.connect(adjacencyListInput, intermediate, DistributionPattern.ALL_TO_ALL);
-		
-		JobGraphUtils.connect(head, tail, DistributionPattern.POINTWISE);
-		JobGraphUtils.connect(intermediate, tail, DistributionPattern.ALL_TO_ALL);
-		tailConfig.setGateIterativeWithNumberOfEventsUntilInterrupt(0, 1);
-		tailConfig.setGateIterativeWithNumberOfEventsUntilInterrupt(1, parallelism);
-
-		JobGraphUtils.connect(head, output, DistributionPattern.POINTWISE);
-
-		JobGraphUtils.connect(head, sync, DistributionPattern.POINTWISE);
-		
-		SlotSharingGroup sharingGroup = new SlotSharingGroup();
-		pageWithRankInput.setSlotSharingGroup(sharingGroup);
-		adjacencyListInput.setSlotSharingGroup(sharingGroup);
-		head.setSlotSharingGroup(sharingGroup);
-		intermediate.setSlotSharingGroup(sharingGroup);
-		tail.setSlotSharingGroup(sharingGroup);
-		output.setSlotSharingGroup(sharingGroup);
-		sync.setSlotSharingGroup(sharingGroup);
-		
-		tail.setStrictlyCoLocatedWith(head);
-		intermediate.setStrictlyCoLocatedWith(head);
-
-		return jobGraph;
-	}
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/a7a57ebe/flink-tests/src/test/java/org/apache/flink/test/iterative/nephele/customdanglingpagerank/CustomCompensatableDanglingPageRankWithCombiner.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/iterative/nephele/customdanglingpagerank/CustomCompensatableDanglingPageRankWithCombiner.java b/flink-tests/src/test/java/org/apache/flink/test/iterative/nephele/customdanglingpagerank/CustomCompensatableDanglingPageRankWithCombiner.java
deleted file mode 100644
index 0bf258f..0000000
--- a/flink-tests/src/test/java/org/apache/flink/test/iterative/nephele/customdanglingpagerank/CustomCompensatableDanglingPageRankWithCombiner.java
+++ /dev/null
@@ -1,329 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.test.iterative.nephele.customdanglingpagerank;
-
-import org.apache.flink.api.common.operators.util.UserCodeClassWrapper;
-import org.apache.flink.api.common.typeutils.TypeComparatorFactory;
-import org.apache.flink.api.common.typeutils.TypePairComparatorFactory;
-import org.apache.flink.api.common.typeutils.TypeSerializerFactory;
-import org.apache.flink.api.java.record.io.FileOutputFormat;
-import org.apache.flink.configuration.Configuration;
-import org.apache.flink.runtime.iterative.task.IterationHeadPactTask;
-import org.apache.flink.runtime.iterative.task.IterationIntermediatePactTask;
-import org.apache.flink.runtime.iterative.task.IterationTailPactTask;
-import org.apache.flink.runtime.jobgraph.JobVertex;
-import org.apache.flink.runtime.jobgraph.DistributionPattern;
-import org.apache.flink.runtime.jobgraph.InputFormatVertex;
-import org.apache.flink.runtime.jobgraph.JobGraph;
-import org.apache.flink.runtime.jobgraph.OutputFormatVertex;
-import org.apache.flink.runtime.jobmanager.scheduler.SlotSharingGroup;
-import org.apache.flink.runtime.operators.BuildSecondCachedJoinDriver;
-import org.apache.flink.runtime.operators.CoGroupDriver;
-import org.apache.flink.runtime.operators.CollectorMapDriver;
-import org.apache.flink.runtime.operators.DriverStrategy;
-import org.apache.flink.runtime.operators.chaining.SynchronousChainedCombineDriver;
-import org.apache.flink.runtime.operators.shipping.ShipStrategyType;
-import org.apache.flink.runtime.operators.util.LocalStrategy;
-import org.apache.flink.runtime.operators.util.TaskConfig;
-import org.apache.flink.test.iterative.nephele.JobGraphUtils;
-import org.apache.flink.test.iterative.nephele.customdanglingpagerank.types.VertexWithAdjacencyList;
-import org.apache.flink.test.iterative.nephele.customdanglingpagerank.types.VertexWithAdjacencyListComparatorFactory;
-import org.apache.flink.test.iterative.nephele.customdanglingpagerank.types.VertexWithAdjacencyListSerializerFactory;
-import org.apache.flink.test.iterative.nephele.customdanglingpagerank.types.VertexWithRank;
-import org.apache.flink.test.iterative.nephele.customdanglingpagerank.types.VertexWithRankAndDangling;
-import org.apache.flink.test.iterative.nephele.customdanglingpagerank.types.VertexWithRankAndDanglingComparatorFactory;
-import org.apache.flink.test.iterative.nephele.customdanglingpagerank.types.VertexWithRankAndDanglingSerializerFactory;
-import org.apache.flink.test.iterative.nephele.customdanglingpagerank.types.VertexWithRankComparatorFactory;
-import org.apache.flink.test.iterative.nephele.customdanglingpagerank.types.VertexWithRankDanglingToVertexWithAdjacencyListPairComparatorFactory;
-import org.apache.flink.test.iterative.nephele.customdanglingpagerank.types.VertexWithRankDanglingToVertexWithRankPairComparatorFactory;
-import org.apache.flink.test.iterative.nephele.customdanglingpagerank.types.VertexWithRankSerializerFactory;
-import org.apache.flink.test.iterative.nephele.danglingpagerank.DiffL1NormConvergenceCriterion;
-import org.apache.flink.test.iterative.nephele.danglingpagerank.PageRankStatsAggregator;
-import org.apache.flink.test.util.TestBaseUtils;
-
-public class CustomCompensatableDanglingPageRankWithCombiner {
-	
-	private static final int NUM_FILE_HANDLES_PER_SORT = 64;
-	
-	private static final float SORT_SPILL_THRESHOLD = 0.85f;
-	
-	private static final int ITERATION_ID = 1;
-	
-	
-	private static TypeSerializerFactory<VertexWithRank> vertexWithRankSerializer = new VertexWithRankSerializerFactory();
-	
-	private static TypeSerializerFactory<VertexWithRankAndDangling> vertexWithRankAndDanglingSerializer = new VertexWithRankAndDanglingSerializerFactory();
-	
-	private static TypeSerializerFactory<VertexWithAdjacencyList> vertexWithAdjacencyListSerializer = new VertexWithAdjacencyListSerializerFactory();
-	
-	private static TypeComparatorFactory<VertexWithRank> vertexWithRankComparator = new VertexWithRankComparatorFactory();
-	
-	private static TypeComparatorFactory<VertexWithRankAndDangling> vertexWithRankAndDanglingComparator = new VertexWithRankAndDanglingComparatorFactory();
-	
-	private static TypeComparatorFactory<VertexWithAdjacencyList> vertexWithAdjacencyListComparator = new VertexWithAdjacencyListComparatorFactory();
-	
-	private static TypePairComparatorFactory<VertexWithRankAndDangling, VertexWithAdjacencyList> matchComparator =
-			new VertexWithRankDanglingToVertexWithAdjacencyListPairComparatorFactory();
-	
-	private static TypePairComparatorFactory<VertexWithRankAndDangling, VertexWithRank> coGroupComparator =
-			new VertexWithRankDanglingToVertexWithRankPairComparatorFactory();
-	
-
-//	public static void main(String[] args) throws Exception {
-//		String confPath = args.length >= 6 ? confPath = args[5] : PlayConstants.PLAY_DIR + "local-conf";
-//		
-//		GlobalConfiguration.loadConfiguration(confPath);
-//		Configuration conf = GlobalConfiguration.getConfiguration();
-//		
-//		JobGraph jobGraph = getJobGraph(args);
-//		JobGraphUtils.submit(jobGraph, conf);
-//	}
-		
-	public static JobGraph getJobGraph(String[] args) throws Exception {
-
-		int parallelism = 2;
-		String pageWithRankInputPath = ""; //"file://" + PlayConstants.PLAY_DIR + "test-inputs/danglingpagerank/pageWithRank";
-		String adjacencyListInputPath = ""; //"file://" + PlayConstants.PLAY_DIR +
-//			"test-inputs/danglingpagerank/adjacencylists";
-		String outputPath =  TestBaseUtils.constructTestURI(CustomCompensatableDanglingPageRankWithCombiner.class, "flink_iterations");
-		int minorConsumer = 2;
-		int matchMemory = 5;
-		int coGroupSortMemory = 5;
-		int numIterations = 25;
-		long numVertices = 5;
-		long numDanglingVertices = 1;
-
-		String failingWorkers = "1";
-		int failingIteration = 2;
-		double messageLoss = 0.75;
-
-		if (args.length >= 14) {
-			parallelism = Integer.parseInt(args[0]);
-			pageWithRankInputPath = args[1];
-			adjacencyListInputPath = args[2];
-			outputPath = args[3];
-			// [4] is config path
-			minorConsumer = Integer.parseInt(args[5]);
-			matchMemory = Integer.parseInt(args[6]);
-			coGroupSortMemory = Integer.parseInt(args[7]);
-			numIterations = Integer.parseInt(args[8]);
-			numVertices = Long.parseLong(args[9]);
-			numDanglingVertices = Long.parseLong(args[10]);
-			failingWorkers = args[11];
-			failingIteration = Integer.parseInt(args[12]);
-			messageLoss = Double.parseDouble(args[13]);
-		}
-
-		int totalMemoryConsumption = 3*minorConsumer + 2*coGroupSortMemory + matchMemory;
-
-		JobGraph jobGraph = new JobGraph("CompensatableDanglingPageRank");
-		
-		// --------------- the inputs ---------------------
-
-		// page rank input
-		InputFormatVertex pageWithRankInput = JobGraphUtils.createInput(new CustomImprovedDanglingPageRankInputFormat(),
-			pageWithRankInputPath, "DanglingPageWithRankInput", jobGraph, parallelism);
-		TaskConfig pageWithRankInputConfig = new TaskConfig(pageWithRankInput.getConfiguration());
-		pageWithRankInputConfig.addOutputShipStrategy(ShipStrategyType.PARTITION_HASH);
-		pageWithRankInputConfig.setOutputComparator(vertexWithRankAndDanglingComparator, 0);
-		pageWithRankInputConfig.setOutputSerializer(vertexWithRankAndDanglingSerializer);
-		pageWithRankInputConfig.setStubParameter("pageRank.numVertices", String.valueOf(numVertices));
-
-		// edges as adjacency list
-		InputFormatVertex adjacencyListInput = JobGraphUtils.createInput(new CustomImprovedAdjacencyListInputFormat(),
-			adjacencyListInputPath, "AdjancencyListInput", jobGraph, parallelism);
-		TaskConfig adjacencyListInputConfig = new TaskConfig(adjacencyListInput.getConfiguration());
-		adjacencyListInputConfig.addOutputShipStrategy(ShipStrategyType.PARTITION_HASH);
-		adjacencyListInputConfig.setOutputSerializer(vertexWithAdjacencyListSerializer);
-		adjacencyListInputConfig.setOutputComparator(vertexWithAdjacencyListComparator, 0);
-
-		// --------------- the head ---------------------
-		JobVertex head = JobGraphUtils.createTask(IterationHeadPactTask.class, "IterationHead", jobGraph,
-			parallelism);
-		TaskConfig headConfig = new TaskConfig(head.getConfiguration());
-		headConfig.setIterationId(ITERATION_ID);
-		
-		// initial input / partial solution
-		headConfig.addInputToGroup(0);
-		headConfig.setIterationHeadPartialSolutionOrWorksetInputIndex(0);
-		headConfig.setInputSerializer(vertexWithRankAndDanglingSerializer, 0);
-		headConfig.setInputComparator(vertexWithRankAndDanglingComparator, 0);
-		headConfig.setInputLocalStrategy(0, LocalStrategy.SORT);
-		headConfig.setRelativeMemoryInput(0, (double)minorConsumer/totalMemoryConsumption);
-		headConfig.setFilehandlesInput(0, NUM_FILE_HANDLES_PER_SORT);
-		headConfig.setSpillingThresholdInput(0, SORT_SPILL_THRESHOLD);
-		
-		// back channel / iterations
-		headConfig.setRelativeBackChannelMemory((double)minorConsumer/totalMemoryConsumption);
-		
-		// output into iteration
-		headConfig.setOutputSerializer(vertexWithRankAndDanglingSerializer);
-		headConfig.addOutputShipStrategy(ShipStrategyType.FORWARD);
-		headConfig.addOutputShipStrategy(ShipStrategyType.FORWARD);
-		
-		// final output
-		TaskConfig headFinalOutConfig = new TaskConfig(new Configuration());
-		headFinalOutConfig.setOutputSerializer(vertexWithRankAndDanglingSerializer);
-		headFinalOutConfig.addOutputShipStrategy(ShipStrategyType.FORWARD);
-		headConfig.setIterationHeadFinalOutputConfig(headFinalOutConfig);
-		
-		// the sync
-		headConfig.setIterationHeadIndexOfSyncOutput(3);
-		headConfig.setNumberOfIterations(numIterations);
-		
-		// the driver 
-		headConfig.setDriver(CollectorMapDriver.class);
-		headConfig.setDriverStrategy(DriverStrategy.COLLECTOR_MAP);
-		headConfig.setStubWrapper(new UserCodeClassWrapper<CustomCompensatingMap>(CustomCompensatingMap.class));
-		headConfig.setStubParameter("pageRank.numVertices", String.valueOf(numVertices));
-		headConfig.setStubParameter("compensation.failingWorker", failingWorkers);
-		headConfig.setStubParameter("compensation.failingIteration", String.valueOf(failingIteration));
-		headConfig.setStubParameter("compensation.messageLoss", String.valueOf(messageLoss));
-		headConfig.addIterationAggregator(CustomCompensatableDotProductCoGroup.AGGREGATOR_NAME, new PageRankStatsAggregator());
-
-		// --------------- the join ---------------------
-		
-		JobVertex intermediate = JobGraphUtils.createTask(IterationIntermediatePactTask.class,
-			"IterationIntermediate", jobGraph, parallelism);
-		TaskConfig intermediateConfig = new TaskConfig(intermediate.getConfiguration());
-		intermediateConfig.setIterationId(ITERATION_ID);
-//		intermediateConfig.setDriver(RepeatableHashjoinMatchDriverWithCachedBuildside.class);
-		intermediateConfig.setDriver(BuildSecondCachedJoinDriver.class);
-		intermediateConfig.setDriverStrategy(DriverStrategy.HYBRIDHASH_BUILD_SECOND);
-		intermediateConfig.setRelativeMemoryDriver((double)matchMemory/totalMemoryConsumption);
-		intermediateConfig.addInputToGroup(0);
-		intermediateConfig.addInputToGroup(1);
-		intermediateConfig.setInputSerializer(vertexWithRankAndDanglingSerializer, 0);
-		intermediateConfig.setInputSerializer(vertexWithAdjacencyListSerializer, 1);
-		intermediateConfig.setDriverComparator(vertexWithRankAndDanglingComparator, 0);
-		intermediateConfig.setDriverComparator(vertexWithAdjacencyListComparator, 1);
-		intermediateConfig.setDriverPairComparator(matchComparator);
-		
-		intermediateConfig.setOutputSerializer(vertexWithRankSerializer);
-		intermediateConfig.addOutputShipStrategy(ShipStrategyType.FORWARD);
-		
-		intermediateConfig.setStubWrapper(new UserCodeClassWrapper<CustomCompensatableDotProductMatch>(CustomCompensatableDotProductMatch.class));
-		intermediateConfig.setStubParameter("pageRank.numVertices", String.valueOf(numVertices));
-		intermediateConfig.setStubParameter("compensation.failingWorker", failingWorkers);
-		intermediateConfig.setStubParameter("compensation.failingIteration", String.valueOf(failingIteration));
-		intermediateConfig.setStubParameter("compensation.messageLoss", String.valueOf(messageLoss));
-		
-		// the combiner and the output
-		TaskConfig combinerConfig = new TaskConfig(new Configuration());
-		combinerConfig.addInputToGroup(0);
-		combinerConfig.setInputSerializer(vertexWithRankSerializer, 0);
-		combinerConfig.setDriverStrategy(DriverStrategy.SORTED_GROUP_COMBINE);
-		combinerConfig.setDriverComparator(vertexWithRankComparator, 0);
-		combinerConfig.setDriverComparator(vertexWithRankComparator, 1);
-		combinerConfig.setRelativeMemoryDriver((double)coGroupSortMemory/totalMemoryConsumption);
-		combinerConfig.setOutputSerializer(vertexWithRankSerializer);
-		combinerConfig.addOutputShipStrategy(ShipStrategyType.PARTITION_HASH);
-		combinerConfig.setOutputComparator(vertexWithRankComparator, 0);
-		combinerConfig.setStubWrapper(new UserCodeClassWrapper<CustomRankCombiner>(CustomRankCombiner.class));
-		intermediateConfig.addChainedTask(SynchronousChainedCombineDriver.class, combinerConfig, "Combiner");
-
-		// ---------------- the tail (co group) --------------------
-		
-		JobVertex tail = JobGraphUtils.createTask(IterationTailPactTask.class, "IterationTail", jobGraph,
-			parallelism);
-		TaskConfig tailConfig = new TaskConfig(tail.getConfiguration());
-		tailConfig.setIterationId(ITERATION_ID);
-		tailConfig.setIsWorksetUpdate();
-		
-		// inputs and driver
-		tailConfig.setDriver(CoGroupDriver.class);
-		tailConfig.setDriverStrategy(DriverStrategy.CO_GROUP);
-		tailConfig.addInputToGroup(0);
-		tailConfig.addInputToGroup(1);
-		tailConfig.setInputSerializer(vertexWithRankAndDanglingSerializer, 0);
-		tailConfig.setInputSerializer(vertexWithRankSerializer, 1);
-		tailConfig.setDriverComparator(vertexWithRankAndDanglingComparator, 0);
-		tailConfig.setDriverComparator(vertexWithRankComparator, 1);
-		tailConfig.setDriverPairComparator(coGroupComparator);
-		tailConfig.setInputAsynchronouslyMaterialized(0, true);
-		tailConfig.setRelativeInputMaterializationMemory(0, (double)minorConsumer/totalMemoryConsumption);
-		tailConfig.setInputLocalStrategy(1, LocalStrategy.SORT);
-		tailConfig.setInputComparator(vertexWithRankComparator, 1);
-		tailConfig.setRelativeMemoryInput(1, (double)coGroupSortMemory/totalMemoryConsumption);
-		tailConfig.setFilehandlesInput(1, NUM_FILE_HANDLES_PER_SORT);
-		tailConfig.setSpillingThresholdInput(1, SORT_SPILL_THRESHOLD);
-		tailConfig.addIterationAggregator(CustomCompensatableDotProductCoGroup.AGGREGATOR_NAME, new PageRankStatsAggregator());
-		
-		// output
-		tailConfig.setOutputSerializer(vertexWithRankAndDanglingSerializer);
-		
-		// the stub
-		tailConfig.setStubWrapper(new UserCodeClassWrapper<CustomCompensatableDotProductCoGroup>(CustomCompensatableDotProductCoGroup.class));
-		tailConfig.setStubParameter("pageRank.numVertices", String.valueOf(numVertices));
-		tailConfig.setStubParameter("pageRank.numDanglingVertices", String.valueOf(numDanglingVertices));
-		tailConfig.setStubParameter("compensation.failingWorker", failingWorkers);
-		tailConfig.setStubParameter("compensation.failingIteration", String.valueOf(failingIteration));
-		tailConfig.setStubParameter("compensation.messageLoss", String.valueOf(messageLoss));
-		
-		// --------------- the output ---------------------
-
-		OutputFormatVertex output = JobGraphUtils.createFileOutput(jobGraph, "FinalOutput", parallelism);
-		TaskConfig outputConfig = new TaskConfig(output.getConfiguration());
-		outputConfig.addInputToGroup(0);
-		outputConfig.setInputSerializer(vertexWithRankAndDanglingSerializer, 0);
-		outputConfig.setStubWrapper(new UserCodeClassWrapper<CustomPageWithRankOutFormat>(CustomPageWithRankOutFormat.class));
-		outputConfig.setStubParameter(FileOutputFormat.FILE_PARAMETER_KEY, outputPath);
-		
-		// --------------- the auxiliaries ---------------------
-
-		JobVertex sync = JobGraphUtils.createSync(jobGraph, parallelism);
-		TaskConfig syncConfig = new TaskConfig(sync.getConfiguration());
-		syncConfig.setNumberOfIterations(numIterations);
-		syncConfig.addIterationAggregator(CustomCompensatableDotProductCoGroup.AGGREGATOR_NAME, new PageRankStatsAggregator());
-		syncConfig.setConvergenceCriterion(CustomCompensatableDotProductCoGroup.AGGREGATOR_NAME, new DiffL1NormConvergenceCriterion());
-		syncConfig.setIterationId(ITERATION_ID);
-		
-		// --------------- the wiring ---------------------
-
-		JobGraphUtils.connect(pageWithRankInput, head, DistributionPattern.ALL_TO_ALL);
-
-		JobGraphUtils.connect(head, intermediate, DistributionPattern.POINTWISE);
-		intermediateConfig.setGateIterativeWithNumberOfEventsUntilInterrupt(0, 1);
-		
-		JobGraphUtils.connect(adjacencyListInput, intermediate, DistributionPattern.ALL_TO_ALL);
-		
-		JobGraphUtils.connect(head, tail, DistributionPattern.POINTWISE);
-		JobGraphUtils.connect(intermediate, tail, DistributionPattern.ALL_TO_ALL);
-		tailConfig.setGateIterativeWithNumberOfEventsUntilInterrupt(0, 1);
-		tailConfig.setGateIterativeWithNumberOfEventsUntilInterrupt(1, parallelism);
-
-		JobGraphUtils.connect(head, output, DistributionPattern.POINTWISE);
-
-		JobGraphUtils.connect(head, sync, DistributionPattern.POINTWISE);
-		
-		SlotSharingGroup sharingGroup = new SlotSharingGroup();
-		pageWithRankInput.setSlotSharingGroup(sharingGroup);
-		adjacencyListInput.setSlotSharingGroup(sharingGroup);
-		head.setSlotSharingGroup(sharingGroup);
-		intermediate.setSlotSharingGroup(sharingGroup);
-		tail.setSlotSharingGroup(sharingGroup);
-		output.setSlotSharingGroup(sharingGroup);
-		sync.setSlotSharingGroup(sharingGroup);
-
-		tail.setStrictlyCoLocatedWith(head);
-		intermediate.setStrictlyCoLocatedWith(head);
-
-		return jobGraph;
-	}
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/a7a57ebe/flink-tests/src/test/java/org/apache/flink/test/iterative/nephele/customdanglingpagerank/CustomCompensatableDotProductCoGroup.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/iterative/nephele/customdanglingpagerank/CustomCompensatableDotProductCoGroup.java b/flink-tests/src/test/java/org/apache/flink/test/iterative/nephele/customdanglingpagerank/CustomCompensatableDotProductCoGroup.java
deleted file mode 100644
index faf777e..0000000
--- a/flink-tests/src/test/java/org/apache/flink/test/iterative/nephele/customdanglingpagerank/CustomCompensatableDotProductCoGroup.java
+++ /dev/null
@@ -1,130 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.test.iterative.nephele.customdanglingpagerank;
-
-import java.util.Iterator;
-import java.util.Set;
-
-import org.apache.flink.api.common.functions.AbstractRichFunction;
-import org.apache.flink.api.common.functions.CoGroupFunction;
-import org.apache.flink.configuration.Configuration;
-import org.apache.flink.test.iterative.nephele.ConfigUtils;
-import org.apache.flink.test.iterative.nephele.customdanglingpagerank.types.VertexWithRank;
-import org.apache.flink.test.iterative.nephele.customdanglingpagerank.types.VertexWithRankAndDangling;
-import org.apache.flink.test.iterative.nephele.danglingpagerank.PageRankStats;
-import org.apache.flink.test.iterative.nephele.danglingpagerank.PageRankStatsAggregator;
-import org.apache.flink.util.Collector;
-
-public class CustomCompensatableDotProductCoGroup extends AbstractRichFunction implements CoGroupFunction<VertexWithRankAndDangling, VertexWithRank, VertexWithRankAndDangling> {
-
-	private static final long serialVersionUID = 1L;
-
-	public static final String AGGREGATOR_NAME = "pagerank.aggregator";
-	
-	private VertexWithRankAndDangling accumulator = new VertexWithRankAndDangling();
-
-	private PageRankStatsAggregator aggregator;
-
-	private long numVertices;
-
-	private long numDanglingVertices;
-
-	private double dampingFactor;
-
-	private double danglingRankFactor;
-
-	private static final double BETA = 0.85;
-	
-	private int workerIndex;
-	
-	private int currentIteration;
-	
-	private int failingIteration;
-	
-	private Set<Integer> failingWorkers;
-
-	@Override
-	public void open(Configuration parameters) throws Exception {
-		workerIndex = getRuntimeContext().getIndexOfThisSubtask();
-		currentIteration = getIterationRuntimeContext().getSuperstepNumber();
-		
-		failingIteration = ConfigUtils.asInteger("compensation.failingIteration", parameters);
-		failingWorkers = ConfigUtils.asIntSet("compensation.failingWorker", parameters);
-		numVertices = ConfigUtils.asLong("pageRank.numVertices", parameters);
-		numDanglingVertices = ConfigUtils.asLong("pageRank.numDanglingVertices", parameters);
-		
-		dampingFactor = (1d - BETA) / (double) numVertices;
-
-		aggregator = getIterationRuntimeContext().getIterationAggregator(AGGREGATOR_NAME);
-		
-		if (currentIteration == 1) {
-			danglingRankFactor = BETA * (double) numDanglingVertices / ((double) numVertices * (double) numVertices);
-		} else {
-			PageRankStats previousAggregate = getIterationRuntimeContext().getPreviousIterationAggregate(AGGREGATOR_NAME);
-			danglingRankFactor = BETA * previousAggregate.danglingRank() / (double) numVertices;
-		}
-	}
-
-	@Override
-	public void coGroup(Iterable<VertexWithRankAndDangling> currentPageRankIterable, Iterable<VertexWithRank> partialRanks,
-			Collector<VertexWithRankAndDangling> collector)
-	{
-		final Iterator<VertexWithRankAndDangling> currentPageRankIterator = currentPageRankIterable.iterator();
-		
-		if (!currentPageRankIterator.hasNext()) {
-			long missingVertex = partialRanks.iterator().next().getVertexID();
-			throw new IllegalStateException("No current page rank for vertex [" + missingVertex + "]!");
-		}
-
-		VertexWithRankAndDangling currentPageRank = currentPageRankIterator.next();
-
-		long edges = 0;
-		double summedRank = 0;
-		for (VertexWithRank pr :partialRanks) {
-			summedRank += pr.getRank();
-			edges++;
-		}
-
-		double rank = BETA * summedRank + dampingFactor + danglingRankFactor;
-
-		double currentRank = currentPageRank.getRank();
-		boolean isDangling = currentPageRank.isDangling();
-
-		double danglingRankToAggregate = isDangling ? rank : 0;
-		long danglingVerticesToAggregate = isDangling ? 1 : 0;
-
-		double diff = Math.abs(currentRank - rank);
-
-		aggregator.aggregate(diff, rank, danglingRankToAggregate, danglingVerticesToAggregate, 1, edges, summedRank, 0);
-
-		accumulator.setVertexID(currentPageRank.getVertexID());
-		accumulator.setRank(rank);
-		accumulator.setDangling(isDangling);
-
-		collector.collect(accumulator);
-	}
-
-	@Override
-	public void close() throws Exception {
-		if (currentIteration == failingIteration && failingWorkers.contains(workerIndex)) {
-			aggregator.reset();
-		}
-	}
-	
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/a7a57ebe/flink-tests/src/test/java/org/apache/flink/test/iterative/nephele/customdanglingpagerank/CustomCompensatableDotProductMatch.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/iterative/nephele/customdanglingpagerank/CustomCompensatableDotProductMatch.java b/flink-tests/src/test/java/org/apache/flink/test/iterative/nephele/customdanglingpagerank/CustomCompensatableDotProductMatch.java
deleted file mode 100644
index cc20aba..0000000
--- a/flink-tests/src/test/java/org/apache/flink/test/iterative/nephele/customdanglingpagerank/CustomCompensatableDotProductMatch.java
+++ /dev/null
@@ -1,80 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-
-package org.apache.flink.test.iterative.nephele.customdanglingpagerank;
-
-import java.util.Random;
-import java.util.Set;
-
-import org.apache.flink.api.common.functions.AbstractRichFunction;
-import org.apache.flink.api.common.functions.FlatJoinFunction;
-import org.apache.flink.configuration.Configuration;
-import org.apache.flink.test.iterative.nephele.ConfigUtils;
-import org.apache.flink.test.iterative.nephele.customdanglingpagerank.types.VertexWithAdjacencyList;
-import org.apache.flink.test.iterative.nephele.customdanglingpagerank.types.VertexWithRank;
-import org.apache.flink.test.iterative.nephele.customdanglingpagerank.types.VertexWithRankAndDangling;
-import org.apache.flink.util.Collector;
-
-public class CustomCompensatableDotProductMatch extends AbstractRichFunction implements
-		FlatJoinFunction<VertexWithRankAndDangling, VertexWithAdjacencyList, VertexWithRank>
-{
-	private static final long serialVersionUID = 1L;
-	
-	
-	private VertexWithRank record = new VertexWithRank();
-
-	private Random random = new Random();
-	
-	private double messageLoss;
-	
-	private boolean isFailure;
-
-	@Override
-	public void open(Configuration parameters) throws Exception {
-		int workerIndex = getRuntimeContext().getIndexOfThisSubtask();
-		int currentIteration = getIterationRuntimeContext().getSuperstepNumber();
-		int failingIteration = ConfigUtils.asInteger("compensation.failingIteration", parameters);
-		Set<Integer> failingWorkers = ConfigUtils.asIntSet("compensation.failingWorker", parameters);
-		isFailure = currentIteration == failingIteration && failingWorkers.contains(workerIndex);
-		messageLoss = ConfigUtils.asDouble("compensation.messageLoss", parameters);
-	}
-
-	@Override
-	public void join(VertexWithRankAndDangling pageWithRank, VertexWithAdjacencyList adjacencyList, Collector<VertexWithRank> collector)
-	throws Exception
-	{
-		double rank = pageWithRank.getRank();
-		long[] adjacentNeighbors = adjacencyList.getTargets();
-		int numNeighbors = adjacencyList.getNumTargets();
-
-		double rankToDistribute = rank / (double) numNeighbors;
-		record.setRank(rankToDistribute);
-
-		for (int n = 0; n < numNeighbors; n++) {
-			record.setVertexID(adjacentNeighbors[n]);
-			if (isFailure) {
-				if (random.nextDouble() >= messageLoss) {
-					collector.collect(record);
-				}
-			} else {
-				collector.collect(record);
-			}
-		}
-	}
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/a7a57ebe/flink-tests/src/test/java/org/apache/flink/test/iterative/nephele/customdanglingpagerank/CustomCompensatingMap.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/iterative/nephele/customdanglingpagerank/CustomCompensatingMap.java b/flink-tests/src/test/java/org/apache/flink/test/iterative/nephele/customdanglingpagerank/CustomCompensatingMap.java
deleted file mode 100644
index d1bf03c..0000000
--- a/flink-tests/src/test/java/org/apache/flink/test/iterative/nephele/customdanglingpagerank/CustomCompensatingMap.java
+++ /dev/null
@@ -1,82 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-
-package org.apache.flink.test.iterative.nephele.customdanglingpagerank;
-
-import java.util.Set;
-
-import org.apache.flink.api.common.functions.AbstractRichFunction;
-import org.apache.flink.api.common.functions.GenericCollectorMap;
-import org.apache.flink.configuration.Configuration;
-import org.apache.flink.test.iterative.nephele.ConfigUtils;
-import org.apache.flink.test.iterative.nephele.customdanglingpagerank.types.VertexWithRankAndDangling;
-import org.apache.flink.test.iterative.nephele.danglingpagerank.PageRankStats;
-import org.apache.flink.util.Collector;
-
-@SuppressWarnings("deprecation")
-public class CustomCompensatingMap extends AbstractRichFunction implements GenericCollectorMap<VertexWithRankAndDangling, VertexWithRankAndDangling> {
-	
-	private static final long serialVersionUID = 1L;
-	
-	
-	private boolean isFailureIteration;
-
-	private boolean isFailingWorker;
-
-	private double uniformRank;
-
-	private double rescaleFactor;
-
-	@Override
-	public void open(Configuration parameters) throws Exception {
-		int currentIteration = getIterationRuntimeContext().getSuperstepNumber();
-		int failingIteration = ConfigUtils.asInteger("compensation.failingIteration", parameters);
-		isFailureIteration = currentIteration == failingIteration + 1;
-		
-		int workerIndex = getRuntimeContext().getIndexOfThisSubtask();
-		Set<Integer> failingWorkers = ConfigUtils.asIntSet("compensation.failingWorker", parameters);
-		isFailingWorker = failingWorkers.contains(workerIndex);
-		
-		long numVertices = ConfigUtils.asLong("pageRank.numVertices", parameters);
-
-		if (currentIteration > 1) {
-			
-			PageRankStats stats = (PageRankStats) getIterationRuntimeContext().getPreviousIterationAggregate(CustomCompensatableDotProductCoGroup.AGGREGATOR_NAME);
-
-			uniformRank = 1d / (double) numVertices;
-			double lostMassFactor = (numVertices - stats.numVertices()) / (double) numVertices;
-			rescaleFactor = (1 - lostMassFactor) / stats.rank();
-		}
-	}
-
-	@Override
-	public void map(VertexWithRankAndDangling pageWithRank, Collector<VertexWithRankAndDangling> out) throws Exception {
-
-		if (isFailureIteration) {
-			double rank = pageWithRank.getRank();
-
-			if (isFailingWorker) {
-				pageWithRank.setRank(uniformRank);
-			} else {
-				pageWithRank.setRank(rank * rescaleFactor);
-			}
-		}
-		out.collect(pageWithRank);
-	}
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/a7a57ebe/flink-tests/src/test/java/org/apache/flink/test/iterative/nephele/customdanglingpagerank/CustomImprovedAdjacencyListInputFormat.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/iterative/nephele/customdanglingpagerank/CustomImprovedAdjacencyListInputFormat.java b/flink-tests/src/test/java/org/apache/flink/test/iterative/nephele/customdanglingpagerank/CustomImprovedAdjacencyListInputFormat.java
deleted file mode 100644
index 863c081..0000000
--- a/flink-tests/src/test/java/org/apache/flink/test/iterative/nephele/customdanglingpagerank/CustomImprovedAdjacencyListInputFormat.java
+++ /dev/null
@@ -1,66 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-
-package org.apache.flink.test.iterative.nephele.customdanglingpagerank;
-
-import org.apache.flink.api.common.io.DelimitedInputFormat;
-import org.apache.flink.test.iterative.nephele.customdanglingpagerank.types.VertexWithAdjacencyList;
-import org.apache.flink.test.iterative.nephele.danglingpagerank.AsciiLongArrayView;
-
-public class CustomImprovedAdjacencyListInputFormat extends DelimitedInputFormat<VertexWithAdjacencyList> {
-	private static final long serialVersionUID = 1L;
-
-	private final AsciiLongArrayView arrayView = new AsciiLongArrayView();
-
-	@Override
-	public VertexWithAdjacencyList readRecord(VertexWithAdjacencyList target, byte[] bytes, int offset, int numBytes) {
-
-		if (numBytes == 0) {
-			return null;
-		}
-
-		arrayView.set(bytes, offset, numBytes);
-		
-		long[] list = target.getTargets();
-
-		try {
-
-			int pos = 0;
-			while (arrayView.next()) {
-
-				if (pos == 0) {
-					target.setVertexID(arrayView.element());
-				} else {
-					if (list.length <= pos - 1) {
-						list = new long[list.length < 16 ? 16 : list.length * 2];
-						target.setTargets(list);
-					}
-					list[pos - 1] = arrayView.element();
-				}
-				pos++;
-			}
-			
-			target.setNumTargets(pos - 1);
-		} catch (RuntimeException e) {
-			throw new RuntimeException("Error parsing: " + arrayView.toString(), e);
-		}
-
-		return target;
-	}
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/a7a57ebe/flink-tests/src/test/java/org/apache/flink/test/iterative/nephele/customdanglingpagerank/CustomImprovedDanglingPageRankInputFormat.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/iterative/nephele/customdanglingpagerank/CustomImprovedDanglingPageRankInputFormat.java b/flink-tests/src/test/java/org/apache/flink/test/iterative/nephele/customdanglingpagerank/CustomImprovedDanglingPageRankInputFormat.java
deleted file mode 100644
index ec5b6d1..0000000
--- a/flink-tests/src/test/java/org/apache/flink/test/iterative/nephele/customdanglingpagerank/CustomImprovedDanglingPageRankInputFormat.java
+++ /dev/null
@@ -1,66 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-
-package org.apache.flink.test.iterative.nephele.customdanglingpagerank;
-
-import org.apache.flink.api.common.io.DelimitedInputFormat;
-import org.apache.flink.configuration.Configuration;
-import org.apache.flink.test.iterative.nephele.ConfigUtils;
-import org.apache.flink.test.iterative.nephele.customdanglingpagerank.types.VertexWithRankAndDangling;
-import org.apache.flink.test.iterative.nephele.danglingpagerank.AsciiLongArrayView;
-
-public class CustomImprovedDanglingPageRankInputFormat extends DelimitedInputFormat<VertexWithRankAndDangling> {
-	private static final long serialVersionUID = 1L;
-
-	private AsciiLongArrayView arrayView = new AsciiLongArrayView();
-
-	private static final long DANGLING_MARKER = 1l;
-	
-	private double initialRank;
-
-	@Override
-	public void configure(Configuration parameters) {
-		long numVertices = ConfigUtils.asLong("pageRank.numVertices", parameters);
-		initialRank = 1.0 / numVertices;
-		super.configure(parameters);
-	}
-
-	@Override
-	public VertexWithRankAndDangling readRecord(VertexWithRankAndDangling target, byte[] bytes, int offset, int numBytes) {
-
-		arrayView.set(bytes, offset, numBytes);
-
-		try {
-			arrayView.next();
-			target.setVertexID(arrayView.element());
-
-			if (arrayView.next()) {
-				target.setDangling(arrayView.element() == DANGLING_MARKER);
-			} else {
-				target.setDangling(false);
-			}
-
-		} catch (NumberFormatException e) {
-			throw new RuntimeException("Error parsing " + arrayView.toString(), e);
-		}
-
-		target.setRank(initialRank);
-		return target;
-	}
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/a7a57ebe/flink-tests/src/test/java/org/apache/flink/test/iterative/nephele/customdanglingpagerank/CustomPageWithRankOutFormat.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/iterative/nephele/customdanglingpagerank/CustomPageWithRankOutFormat.java b/flink-tests/src/test/java/org/apache/flink/test/iterative/nephele/customdanglingpagerank/CustomPageWithRankOutFormat.java
deleted file mode 100644
index b4ffb25..0000000
--- a/flink-tests/src/test/java/org/apache/flink/test/iterative/nephele/customdanglingpagerank/CustomPageWithRankOutFormat.java
+++ /dev/null
@@ -1,45 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-
-package org.apache.flink.test.iterative.nephele.customdanglingpagerank;
-
-import com.google.common.base.Charsets;
-
-import java.io.IOException;
-
-import org.apache.flink.api.common.io.FileOutputFormat;
-import org.apache.flink.test.iterative.nephele.customdanglingpagerank.types.VertexWithRankAndDangling;
-
-public class CustomPageWithRankOutFormat extends FileOutputFormat<VertexWithRankAndDangling> {
-	private static final long serialVersionUID = 1L;
-
-	private final StringBuilder buffer = new StringBuilder();
-
-	@Override
-	public void writeRecord(VertexWithRankAndDangling record) throws IOException {
-		buffer.setLength(0);
-		buffer.append(record.getVertexID());
-		buffer.append('\t');
-		buffer.append(record.getRank());
-		buffer.append('\n');
-
-		byte[] bytes = buffer.toString().getBytes(Charsets.UTF_8);
-		stream.write(bytes);
-	}
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/a7a57ebe/flink-tests/src/test/java/org/apache/flink/test/iterative/nephele/customdanglingpagerank/CustomRankCombiner.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/iterative/nephele/customdanglingpagerank/CustomRankCombiner.java b/flink-tests/src/test/java/org/apache/flink/test/iterative/nephele/customdanglingpagerank/CustomRankCombiner.java
deleted file mode 100644
index e2a160d..0000000
--- a/flink-tests/src/test/java/org/apache/flink/test/iterative/nephele/customdanglingpagerank/CustomRankCombiner.java
+++ /dev/null
@@ -1,57 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.test.iterative.nephele.customdanglingpagerank;
-
-import java.util.Iterator;
-
-import org.apache.flink.api.common.functions.AbstractRichFunction;
-import org.apache.flink.api.common.functions.GroupCombineFunction;
-import org.apache.flink.api.common.functions.GroupReduceFunction;
-import org.apache.flink.test.iterative.nephele.customdanglingpagerank.types.VertexWithRank;
-import org.apache.flink.util.Collector;
-
-
-public class CustomRankCombiner extends AbstractRichFunction implements GroupReduceFunction<VertexWithRank, VertexWithRank>,
-		GroupCombineFunction<VertexWithRank, VertexWithRank>
-{
-	private static final long serialVersionUID = 1L;
-	
-	private final VertexWithRank accumulator = new VertexWithRank();
-	
-	@Override
-	public void reduce(Iterable<VertexWithRank> records, Collector<VertexWithRank> out) throws Exception {
-		throw new UnsupportedOperationException();
-	}
-
-	@Override
-	public void combine(Iterable<VertexWithRank> recordsIterable, Collector<VertexWithRank> out) throws Exception {
-		final Iterator<VertexWithRank> records = recordsIterable.iterator();
-		
-		VertexWithRank next = records.next();
-		this.accumulator.setVertexID(next.getVertexID());
-		double rank = next.getRank();
-		
-		while (records.hasNext()) {
-			rank += records.next().getRank();
-		}
-		
-		this.accumulator.setRank(rank);
-		out.collect(this.accumulator);
-	}
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/a7a57ebe/flink-tests/src/test/java/org/apache/flink/test/iterative/nephele/customdanglingpagerank/types/VertexWithAdjacencyList.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/iterative/nephele/customdanglingpagerank/types/VertexWithAdjacencyList.java b/flink-tests/src/test/java/org/apache/flink/test/iterative/nephele/customdanglingpagerank/types/VertexWithAdjacencyList.java
deleted file mode 100644
index d19dd59..0000000
--- a/flink-tests/src/test/java/org/apache/flink/test/iterative/nephele/customdanglingpagerank/types/VertexWithAdjacencyList.java
+++ /dev/null
@@ -1,83 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.test.iterative.nephele.customdanglingpagerank.types;
-
-
-/**
- *
- */
-public final class VertexWithAdjacencyList {
-	
-	private static final long[] EMPTY = new long[0];
-	
-	private long vertexID;
-	
-	private long[] targets;
-	
-	private int numTargets;
-	
-	public VertexWithAdjacencyList() {
-		this.targets = EMPTY;
-	}
-	
-	public VertexWithAdjacencyList(long vertexID, long[] targets) {
-		this.vertexID = vertexID;
-		this.targets = targets;
-	}
-
-	
-	public long getVertexID() {
-		return vertexID;
-	}
-	
-	public void setVertexID(long vertexID) {
-		this.vertexID = vertexID;
-	}
-	
-	public long[] getTargets() {
-		return targets;
-	}
-	
-	public void setTargets(long[] targets) {
-		this.targets = targets;
-	}
-	
-	public int getNumTargets() {
-		return numTargets;
-	}
-	
-	public void setNumTargets(int numTargets) {
-		this.numTargets = numTargets;
-	}
-	
-
-	@Override
-	public String toString() {
-		StringBuilder bld = new StringBuilder(32);
-		bld.append(this.vertexID);
-		bld.append(" : ");
-		for (int i = 0; i < this.numTargets; i++) {
-			if (i != 0) {
-				bld.append(',');
-			}
-			bld.append(this.targets[i]);
-		}
-		return bld.toString();
-	}
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/a7a57ebe/flink-tests/src/test/java/org/apache/flink/test/iterative/nephele/customdanglingpagerank/types/VertexWithAdjacencyListComparator.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/iterative/nephele/customdanglingpagerank/types/VertexWithAdjacencyListComparator.java b/flink-tests/src/test/java/org/apache/flink/test/iterative/nephele/customdanglingpagerank/types/VertexWithAdjacencyListComparator.java
deleted file mode 100644
index 7d11530..0000000
--- a/flink-tests/src/test/java/org/apache/flink/test/iterative/nephele/customdanglingpagerank/types/VertexWithAdjacencyListComparator.java
+++ /dev/null
@@ -1,148 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.test.iterative.nephele.customdanglingpagerank.types;
-
-import java.io.IOException;
-
-import org.apache.flink.api.common.typeutils.TypeComparator;
-import org.apache.flink.api.common.typeutils.base.LongComparator;
-import org.apache.flink.core.memory.DataInputView;
-import org.apache.flink.core.memory.DataOutputView;
-import org.apache.flink.core.memory.MemorySegment;
-
-public final class VertexWithAdjacencyListComparator extends TypeComparator<VertexWithAdjacencyList> {
-	
-	private static final long serialVersionUID = 1L;
-	
-	private long reference;
-
-	@SuppressWarnings("rawtypes")
-	private TypeComparator[] comparators = new TypeComparator[]{new LongComparator(true)};
-
-	@Override
-	public int hash(VertexWithAdjacencyList record) {
-		final long value = record.getVertexID();
-		return 43 + (int) (value ^ value >>> 32);
-	}
-
-	@Override
-	public void setReference(VertexWithAdjacencyList toCompare) {
-		this.reference = toCompare.getVertexID();
-	}
-
-	@Override
-	public boolean equalToReference(VertexWithAdjacencyList candidate) {
-		return candidate.getVertexID() == this.reference;
-	}
-
-	@Override
-	public int compareToReference(TypeComparator<VertexWithAdjacencyList> referencedComparator) {
-		VertexWithAdjacencyListComparator comp = (VertexWithAdjacencyListComparator) referencedComparator;
-		final long diff = comp.reference - this.reference;
-		return diff < 0 ? -1 : diff > 0 ? 1 : 0;
-	}
-	
-	@Override
-	public int compare(VertexWithAdjacencyList first, VertexWithAdjacencyList second) {
-		final long diff = first.getVertexID() - second.getVertexID();
-		return diff < 0 ? -1 : diff > 0 ? 1 : 0;
-	}
-
-	@Override
-	public int compareSerialized(DataInputView source1, DataInputView source2) throws IOException {
-		final long diff = source1.readLong() - source2.readLong();
-		return diff < 0 ? -1 : diff > 0 ? 1 : 0;
-	}
-
-	@Override
-	public boolean supportsNormalizedKey() {
-		return true;
-	}
-
-	@Override
-	public int getNormalizeKeyLen() {
-		return 8;
-	}
-
-	@Override
-	public boolean isNormalizedKeyPrefixOnly(int keyBytes) {
-		return keyBytes < 8;
-	}
-
-	@Override
-	public void putNormalizedKey(VertexWithAdjacencyList record, MemorySegment target, int offset, int len) {
-		final long value = record.getVertexID() - Long.MIN_VALUE;
-		
-		// see IntValue for an explanation of the logic
-		if (len == 8) {
-			// default case, full normalized key
-			target.putLongBigEndian(offset, value);
-		}
-		else if (len <= 0) {
-		}
-		else if (len < 8) {
-			for (int i = 0; len > 0; len--, i++) {
-				target.put(offset + i, (byte) ((value >>> ((3-i)<<3)) & 0xff));
-			}
-		}
-		else {
-			target.putLongBigEndian(offset, value);
-			for (int i = 8; i < len; i++) {
-				target.put(offset + i, (byte) 0);
-			}
-		}
-	}
-
-	@Override
-	public boolean invertNormalizedKey() {
-		return false;
-	}
-	
-	@Override
-	public boolean supportsSerializationWithKeyNormalization() {
-		return false;
-	}
-
-	@Override
-	public void writeWithKeyNormalization(VertexWithAdjacencyList record, DataOutputView target) {
-		throw new UnsupportedOperationException();
-	}
-
-	@Override
-	public VertexWithAdjacencyList readWithKeyDenormalization(VertexWithAdjacencyList reuse, DataInputView source) {
-		throw new UnsupportedOperationException();
-	}
-
-	@Override
-	public VertexWithAdjacencyListComparator duplicate() {
-		return new VertexWithAdjacencyListComparator();
-	}
-
-	@Override
-	public int extractKeys(Object record, Object[] target, int index) {
-		target[index] = ((VertexWithAdjacencyList) record).getVertexID();
-		return 1;
-	}
-	
-	@Override
-	@SuppressWarnings("rawtypes")
-	public TypeComparator[] getFlatComparators() {
-		return comparators;
-	}
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/a7a57ebe/flink-tests/src/test/java/org/apache/flink/test/iterative/nephele/customdanglingpagerank/types/VertexWithAdjacencyListComparatorFactory.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/iterative/nephele/customdanglingpagerank/types/VertexWithAdjacencyListComparatorFactory.java b/flink-tests/src/test/java/org/apache/flink/test/iterative/nephele/customdanglingpagerank/types/VertexWithAdjacencyListComparatorFactory.java
deleted file mode 100644
index 0bc3263..0000000
--- a/flink-tests/src/test/java/org/apache/flink/test/iterative/nephele/customdanglingpagerank/types/VertexWithAdjacencyListComparatorFactory.java
+++ /dev/null
@@ -1,39 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.test.iterative.nephele.customdanglingpagerank.types;
-
-import org.apache.flink.api.common.typeutils.TypeComparatorFactory;
-import org.apache.flink.configuration.Configuration;
-
-/**
- *
- */
-public final class VertexWithAdjacencyListComparatorFactory implements TypeComparatorFactory<VertexWithAdjacencyList> {
-	
-	@Override
-	public void writeParametersToConfig(Configuration config) {}
-
-	@Override
-	public void readParametersFromConfig(Configuration config, ClassLoader cl) throws ClassNotFoundException {}
-
-	@Override
-	public VertexWithAdjacencyListComparator createComparator() {
-		return new VertexWithAdjacencyListComparator();
-	}
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/a7a57ebe/flink-tests/src/test/java/org/apache/flink/test/iterative/nephele/customdanglingpagerank/types/VertexWithAdjacencyListSerializer.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/iterative/nephele/customdanglingpagerank/types/VertexWithAdjacencyListSerializer.java b/flink-tests/src/test/java/org/apache/flink/test/iterative/nephele/customdanglingpagerank/types/VertexWithAdjacencyListSerializer.java
deleted file mode 100644
index 751ced3..0000000
--- a/flink-tests/src/test/java/org/apache/flink/test/iterative/nephele/customdanglingpagerank/types/VertexWithAdjacencyListSerializer.java
+++ /dev/null
@@ -1,112 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.test.iterative.nephele.customdanglingpagerank.types;
-
-import java.io.IOException;
-
-import org.apache.flink.api.common.typeutils.base.TypeSerializerSingleton;
-import org.apache.flink.core.memory.DataInputView;
-import org.apache.flink.core.memory.DataOutputView;
-
-public final class VertexWithAdjacencyListSerializer extends TypeSerializerSingleton<VertexWithAdjacencyList> {
-
-	private static final long serialVersionUID = 1L;
-	
-	
-	@Override
-	public boolean isImmutableType() {
-		return false;
-	}
-
-	@Override
-	public VertexWithAdjacencyList createInstance() {
-		return new VertexWithAdjacencyList();
-	}
-
-	@Override
-	public VertexWithAdjacencyList copy(VertexWithAdjacencyList from) {
-		VertexWithAdjacencyList copy = new VertexWithAdjacencyList(from.getVertexID(), new long[from.getNumTargets()]);
-		copy.setNumTargets(from.getNumTargets());
-		System.arraycopy(from.getTargets(), 0, copy.getTargets(), 0, from.getNumTargets());
-		return copy;
-	}
-	
-	@Override
-	public VertexWithAdjacencyList copy(VertexWithAdjacencyList from, VertexWithAdjacencyList reuse) {
-		if (reuse.getTargets().length < from.getTargets().length) {
-			reuse.setTargets(new long[from.getTargets().length]);
-		}
-		
-		reuse.setVertexID(from.getVertexID());
-		reuse.setNumTargets(from.getNumTargets());
-		System.arraycopy(from.getTargets(), 0, reuse.getTargets(), 0, from.getNumTargets());
-		return reuse;
-	}
-
-	@Override
-	public int getLength() {
-		return -1;
-	}
-
-	@Override
-	public void serialize(VertexWithAdjacencyList record, DataOutputView target) throws IOException {
-		target.writeLong(record.getVertexID());
-		
-		final long[] targets = record.getTargets();
-		final int numTargets = record.getNumTargets();
-		target.writeInt(numTargets);
-		
-		for (int i = 0; i < numTargets; i++) {
-			target.writeLong(targets[i]);
-		}
-	}
-
-	@Override
-	public VertexWithAdjacencyList deserialize(DataInputView source) throws IOException {
-		return deserialize(new VertexWithAdjacencyList(), source);
-	}
-	
-	@Override
-	public VertexWithAdjacencyList deserialize(VertexWithAdjacencyList target, DataInputView source) throws IOException {
-		target.setVertexID(source.readLong());
-		
-		final int numTargets = source.readInt();
-		long[] targets = target.getTargets();
-		if (targets.length < numTargets) {
-			targets = new long[numTargets];
-			target.setTargets(targets);
-		}
-		
-		target.setNumTargets(numTargets);
-		
-		for (int i = 0; i < numTargets; i++) {
-			targets[i] = source.readLong();
-		}
-		return target;
-	}
-
-	@Override
-	public void copy(DataInputView source, DataOutputView target) throws IOException {
-		target.write(source, 8);
-		
-		final int numTargets = source.readInt();
-		target.writeInt(numTargets);
-		target.write(source, numTargets * 8);
-	}
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/a7a57ebe/flink-tests/src/test/java/org/apache/flink/test/iterative/nephele/customdanglingpagerank/types/VertexWithAdjacencyListSerializerFactory.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/iterative/nephele/customdanglingpagerank/types/VertexWithAdjacencyListSerializerFactory.java b/flink-tests/src/test/java/org/apache/flink/test/iterative/nephele/customdanglingpagerank/types/VertexWithAdjacencyListSerializerFactory.java
deleted file mode 100644
index cdd50a6..0000000
--- a/flink-tests/src/test/java/org/apache/flink/test/iterative/nephele/customdanglingpagerank/types/VertexWithAdjacencyListSerializerFactory.java
+++ /dev/null
@@ -1,56 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.test.iterative.nephele.customdanglingpagerank.types;
-
-import org.apache.flink.api.common.typeutils.TypeSerializerFactory;
-import org.apache.flink.configuration.Configuration;
-
-
-public final class VertexWithAdjacencyListSerializerFactory implements TypeSerializerFactory<VertexWithAdjacencyList> {
-
-	private static final VertexWithAdjacencyListSerializer INSTANCE = new VertexWithAdjacencyListSerializer();
-	
-	@Override
-	public void writeParametersToConfig(Configuration config) {}
-
-	@Override
-	public void readParametersFromConfig(Configuration config, ClassLoader cl) throws ClassNotFoundException {}
-
-	@Override
-	public VertexWithAdjacencyListSerializer getSerializer() {
-		return INSTANCE;
-	}
-
-	@Override
-	public Class<VertexWithAdjacencyList> getDataType() {
-		return VertexWithAdjacencyList.class;
-	}
-	
-	// --------------------------------------------------------------------------------------------
-	
-	@Override
-	public int hashCode() {
-		return 1;
-	}
-	
-	@Override
-	public boolean equals(Object obj) {
-		return obj != null && obj.getClass() == VertexWithAdjacencyListSerializerFactory.class;
-	}
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/a7a57ebe/flink-tests/src/test/java/org/apache/flink/test/iterative/nephele/customdanglingpagerank/types/VertexWithRank.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/iterative/nephele/customdanglingpagerank/types/VertexWithRank.java b/flink-tests/src/test/java/org/apache/flink/test/iterative/nephele/customdanglingpagerank/types/VertexWithRank.java
deleted file mode 100644
index e62185e..0000000
--- a/flink-tests/src/test/java/org/apache/flink/test/iterative/nephele/customdanglingpagerank/types/VertexWithRank.java
+++ /dev/null
@@ -1,65 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.test.iterative.nephele.customdanglingpagerank.types;
-
-import java.io.Serializable;
-
-
-/**
- *
- */
-public final class VertexWithRank implements Serializable {
-	private static final long serialVersionUID = 1L;
-
-	private long vertexID;
-	
-	private double rank;
-
-	
-	public VertexWithRank() {
-	}
-	
-	public VertexWithRank(long vertexID, double rank) {
-		this.vertexID = vertexID;
-		this.rank = rank;
-	}
-
-	
-	public long getVertexID() {
-		return vertexID;
-	}
-	
-	public void setVertexID(long vertexID) {
-		this.vertexID = vertexID;
-	}
-	
-	public double getRank() {
-		return rank;
-	}
-	
-	public void setRank(double rank) {
-		this.rank = rank;
-	}
-	
-
-	@Override
-	public String toString() {
-		return this.vertexID + " - " + this.rank;
-	}
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/a7a57ebe/flink-tests/src/test/java/org/apache/flink/test/iterative/nephele/customdanglingpagerank/types/VertexWithRankAndDangling.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/iterative/nephele/customdanglingpagerank/types/VertexWithRankAndDangling.java b/flink-tests/src/test/java/org/apache/flink/test/iterative/nephele/customdanglingpagerank/types/VertexWithRankAndDangling.java
deleted file mode 100644
index 4cdb594..0000000
--- a/flink-tests/src/test/java/org/apache/flink/test/iterative/nephele/customdanglingpagerank/types/VertexWithRankAndDangling.java
+++ /dev/null
@@ -1,76 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.test.iterative.nephele.customdanglingpagerank.types;
-
-import java.io.Serializable;
-
-
-/**
- *
- */
-public final class VertexWithRankAndDangling implements Serializable {
-	private static final long serialVersionUID = 1L;
-
-	private long vertexID;
-	
-	private double rank;
-
-	private boolean dangling;
-	
-
-	public VertexWithRankAndDangling() {
-	}
-	
-	public VertexWithRankAndDangling(long vertexID, double rank, boolean dangling) {
-		this.vertexID = vertexID;
-		this.rank = rank;
-		this.dangling = dangling;
-	}
-
-	
-	public long getVertexID() {
-		return vertexID;
-	}
-	
-	public void setVertexID(long vertexID) {
-		this.vertexID = vertexID;
-	}
-	
-	public double getRank() {
-		return rank;
-	}
-	
-	public void setRank(double rank) {
-		this.rank = rank;
-	}
-	
-	public boolean isDangling() {
-		return dangling;
-	}
-	
-	public void setDangling(boolean dangling) {
-		this.dangling = dangling;
-	}
-	
-
-	@Override
-	public String toString() {
-		return this.vertexID + " - " + this.rank + (this.isDangling() ? " (dangling)" : "");
-	}
-}