You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by se...@apache.org on 2015/08/30 23:44:37 UTC

[2/5] flink git commit: [FLINK-1681] [tests] Remove outdated 'nephele' iteration tests.

http://git-wip-us.apache.org/repos/asf/flink/blob/a7a57ebe/flink-tests/src/test/java/org/apache/flink/test/iterative/nephele/danglingpagerank/PageRankStats.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/iterative/nephele/danglingpagerank/PageRankStats.java b/flink-tests/src/test/java/org/apache/flink/test/iterative/nephele/danglingpagerank/PageRankStats.java
deleted file mode 100644
index e093a48..0000000
--- a/flink-tests/src/test/java/org/apache/flink/test/iterative/nephele/danglingpagerank/PageRankStats.java
+++ /dev/null
@@ -1,124 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-
-package org.apache.flink.test.iterative.nephele.danglingpagerank;
-
-import java.io.IOException;
-
-import org.apache.flink.core.memory.DataInputView;
-import org.apache.flink.core.memory.DataOutputView;
-import org.apache.flink.types.Value;
-
-public class PageRankStats implements Value {
-	private static final long serialVersionUID = 1L;
-
-	private double diff;
-
-	private double rank;
-
-	private double danglingRank;
-
-	private long numDanglingVertices;
-
-	private long numVertices;
-
-	private long edges;
-
-	private double summedRank;
-
-	private double finalDiff;
-
-	public PageRankStats() {
-	}
-
-	public PageRankStats(double diff, double rank, double danglingRank, long numDanglingVertices, long numVertices,
-			long edges, double summedRank, double finalDiff) {
-		this.diff = diff;
-		this.rank = rank;
-		this.danglingRank = danglingRank;
-		this.numDanglingVertices = numDanglingVertices;
-		this.numVertices = numVertices;
-		this.edges = edges;
-		this.summedRank = summedRank;
-		this.finalDiff = finalDiff;
-	}
-
-	public double diff() {
-		return diff;
-	}
-
-	public double rank() {
-		return rank;
-	}
-
-	public double danglingRank() {
-		return danglingRank;
-	}
-
-	public long numDanglingVertices() {
-		return numDanglingVertices;
-	}
-
-	public long numVertices() {
-		return numVertices;
-	}
-
-	public long edges() {
-		return edges;
-	}
-
-	public double summedRank() {
-		return summedRank;
-	}
-
-	public double finalDiff() {
-		return finalDiff;
-	}
-
-	@Override
-	public void write(DataOutputView out) throws IOException {
-		out.writeDouble(diff);
-		out.writeDouble(rank);
-		out.writeDouble(danglingRank);
-		out.writeLong(numDanglingVertices);
-		out.writeLong(numVertices);
-		out.writeLong(edges);
-		out.writeDouble(summedRank);
-		out.writeDouble(finalDiff);
-	}
-
-	@Override
-	public void read(DataInputView in) throws IOException {
-		diff = in.readDouble();
-		rank = in.readDouble();
-		danglingRank = in.readDouble();
-		numDanglingVertices = in.readLong();
-		numVertices = in.readLong();
-		edges = in.readLong();
-		summedRank = in.readDouble();
-		finalDiff = in.readDouble();
-	}
-
-	@Override
-	public String toString() {
-		return "PageRankStats: diff [" + diff + "], rank [" + rank + "], danglingRank [" + danglingRank +
-			"], numDanglingVertices [" + numDanglingVertices + "], numVertices [" + numVertices + "], edges [" + edges +
-			"], summedRank [" + summedRank + "], finalDiff [" + finalDiff + "]";
-	}
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/a7a57ebe/flink-tests/src/test/java/org/apache/flink/test/iterative/nephele/danglingpagerank/PageRankStatsAggregator.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/iterative/nephele/danglingpagerank/PageRankStatsAggregator.java b/flink-tests/src/test/java/org/apache/flink/test/iterative/nephele/danglingpagerank/PageRankStatsAggregator.java
deleted file mode 100644
index 4b41e45..0000000
--- a/flink-tests/src/test/java/org/apache/flink/test/iterative/nephele/danglingpagerank/PageRankStatsAggregator.java
+++ /dev/null
@@ -1,84 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-
-package org.apache.flink.test.iterative.nephele.danglingpagerank;
-
-import org.apache.flink.api.common.aggregators.Aggregator;
-
-@SuppressWarnings("serial")
-public class PageRankStatsAggregator implements Aggregator<PageRankStats> {
-
-	private double diff = 0;
-
-	private double rank = 0;
-
-	private double danglingRank = 0;
-
-	private long numDanglingVertices = 0;
-
-	private long numVertices = 0;
-
-	private long edges = 0;
-
-	private double summedRank = 0;
-
-	private double finalDiff = 0;
-
-	@Override
-	public PageRankStats getAggregate() {
-		return new PageRankStats(diff, rank, danglingRank, numDanglingVertices, numVertices, edges, summedRank,
-			finalDiff);
-	}
-
-	public void aggregate(double diffDelta, double rankDelta, double danglingRankDelta, long danglingVerticesDelta,
-			long verticesDelta, long edgesDelta, double summedRankDelta, double finalDiffDelta) {
-		diff += diffDelta;
-		rank += rankDelta;
-		danglingRank += danglingRankDelta;
-		numDanglingVertices += danglingVerticesDelta;
-		numVertices += verticesDelta;
-		edges += edgesDelta;
-		summedRank += summedRankDelta;
-		finalDiff += finalDiffDelta;
-	}
-
-	@Override
-	public void aggregate(PageRankStats pageRankStats) {
-		diff += pageRankStats.diff();
-		rank += pageRankStats.rank();
-		danglingRank += pageRankStats.danglingRank();
-		numDanglingVertices += pageRankStats.numDanglingVertices();
-		numVertices += pageRankStats.numVertices();
-		edges += pageRankStats.edges();
-		summedRank += pageRankStats.summedRank();
-		finalDiff += pageRankStats.finalDiff();
-	}
-
-	@Override
-	public void reset() {
-		diff = 0;
-		rank = 0;
-		danglingRank = 0;
-		numDanglingVertices = 0;
-		numVertices = 0;
-		edges = 0;
-		summedRank = 0;
-		finalDiff = 0;
-	}
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/a7a57ebe/flink-tests/src/test/java/org/apache/flink/test/iterative/nephele/danglingpagerank/PageWithRankOutFormat.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/iterative/nephele/danglingpagerank/PageWithRankOutFormat.java b/flink-tests/src/test/java/org/apache/flink/test/iterative/nephele/danglingpagerank/PageWithRankOutFormat.java
deleted file mode 100644
index 5cd520f..0000000
--- a/flink-tests/src/test/java/org/apache/flink/test/iterative/nephele/danglingpagerank/PageWithRankOutFormat.java
+++ /dev/null
@@ -1,47 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-
-package org.apache.flink.test.iterative.nephele.danglingpagerank;
-
-import com.google.common.base.Charsets;
-
-import java.io.IOException;
-
-import org.apache.flink.api.java.record.io.FileOutputFormat;
-import org.apache.flink.types.DoubleValue;
-import org.apache.flink.types.LongValue;
-import org.apache.flink.types.Record;
-
-public class PageWithRankOutFormat extends FileOutputFormat {
-  private static final long serialVersionUID = 1L;
-
-  private final StringBuilder buffer = new StringBuilder();
-
-  @Override
-  public void writeRecord(Record record) throws IOException {
-    buffer.setLength(0);
-    buffer.append(record.getField(0, LongValue.class).toString());
-    buffer.append('\t');
-    buffer.append(record.getField(1, DoubleValue.class).toString());
-    buffer.append('\n');
-
-    byte[] bytes = buffer.toString().getBytes(Charsets.UTF_8);
-    stream.write(bytes);
-  }
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/a7a57ebe/flink-tests/src/test/java/org/apache/flink/test/recordJobs/graph/DanglingPageRank.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/recordJobs/graph/DanglingPageRank.java b/flink-tests/src/test/java/org/apache/flink/test/recordJobs/graph/DanglingPageRank.java
deleted file mode 100644
index 80ba91a..0000000
--- a/flink-tests/src/test/java/org/apache/flink/test/recordJobs/graph/DanglingPageRank.java
+++ /dev/null
@@ -1,105 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-
-package org.apache.flink.test.recordJobs.graph;
-
-import org.apache.flink.api.common.Plan;
-import org.apache.flink.api.common.Program;
-import org.apache.flink.api.common.ProgramDescription;
-import org.apache.flink.api.java.record.operators.BulkIteration;
-import org.apache.flink.api.java.record.operators.CoGroupOperator;
-import org.apache.flink.api.java.record.operators.FileDataSink;
-import org.apache.flink.api.java.record.operators.FileDataSource;
-import org.apache.flink.api.java.record.operators.JoinOperator;
-import org.apache.flink.test.recordJobs.graph.pageRankUtil.DanglingPageRankInputFormat;
-import org.apache.flink.test.recordJobs.graph.pageRankUtil.DiffL1NormConvergenceCriterion;
-import org.apache.flink.test.recordJobs.graph.pageRankUtil.DotProductCoGroup;
-import org.apache.flink.test.recordJobs.graph.pageRankUtil.DotProductMatch;
-import org.apache.flink.test.recordJobs.graph.pageRankUtil.ImprovedAdjacencyListInputFormat;
-import org.apache.flink.test.recordJobs.graph.pageRankUtil.PageRankStatsAggregator;
-import org.apache.flink.test.recordJobs.graph.pageRankUtil.PageWithRankOutFormat;
-import org.apache.flink.types.LongValue;
-
-@SuppressWarnings("deprecation")
-public class DanglingPageRank implements Program, ProgramDescription {
-	
-	private static final long serialVersionUID = 1L;
-	
-	public static final String NUM_VERTICES_CONFIG_PARAM = "pageRank.numVertices";
-		
-	public Plan getPlan(String ... args) {
-		int parallelism = 1;
-		String pageWithRankInputPath = "";
-		String adjacencyListInputPath = "";
-		String outputPath = "";
-		int numIterations = 25;
-		long numVertices = 5;
-		long numDanglingVertices = 1;
-
-		if (args.length >= 7) {
-			parallelism = Integer.parseInt(args[0]);
-			pageWithRankInputPath = args[1];
-			adjacencyListInputPath = args[2];
-			outputPath = args[3];
-			numIterations = Integer.parseInt(args[4]);
-			numVertices = Long.parseLong(args[5]);
-			numDanglingVertices = Long.parseLong(args[6]);
-		}
-		
-		FileDataSource pageWithRankInput = new FileDataSource(new DanglingPageRankInputFormat(),
-			pageWithRankInputPath, "DanglingPageWithRankInput");
-		pageWithRankInput.getParameters().setLong(DanglingPageRankInputFormat.NUM_VERTICES_PARAMETER, numVertices);
-		
-		BulkIteration iteration = new BulkIteration("Page Rank Loop");
-		iteration.setInput(pageWithRankInput);
-		
-		FileDataSource adjacencyListInput = new FileDataSource(new ImprovedAdjacencyListInputFormat(),
-			adjacencyListInputPath, "AdjancencyListInput");
-		
-		JoinOperator join = JoinOperator.builder(new DotProductMatch(), LongValue.class, 0, 0)
-				.input1(iteration.getPartialSolution())
-				.input2(adjacencyListInput)
-				.name("Join with Edges")
-				.build();
-		
-		CoGroupOperator rankAggregation = CoGroupOperator.builder(new DotProductCoGroup(), LongValue.class, 0, 0)
-				.input1(iteration.getPartialSolution())
-				.input2(join)
-				.name("Rank Aggregation")
-				.build();
-		rankAggregation.getParameters().setLong(DotProductCoGroup.NUM_VERTICES_PARAMETER, numVertices);
-		rankAggregation.getParameters().setLong(DotProductCoGroup.NUM_DANGLING_VERTICES_PARAMETER, numDanglingVertices);
-		
-		iteration.setNextPartialSolution(rankAggregation);
-		iteration.setMaximumNumberOfIterations(numIterations);
-		iteration.getAggregators().registerAggregationConvergenceCriterion(DotProductCoGroup.AGGREGATOR_NAME, new PageRankStatsAggregator(), 
-				new DiffL1NormConvergenceCriterion());
-		
-		FileDataSink out = new FileDataSink(new PageWithRankOutFormat(), outputPath, iteration, "Final Ranks");
-
-		Plan p = new Plan(out, "Dangling PageRank");
-		p.setDefaultParallelism(parallelism);
-		return p;
-	}
-
-	@Override
-	public String getDescription() {
-		return "Parameters: <parallelism> <pages-input-path> <edges-input-path> <output-path> <max-iterations> <num-vertices> <num-dangling-vertices>";
-	}
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/a7a57ebe/flink-tests/src/test/java/org/apache/flink/test/recordJobs/graph/SimplePageRank.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/recordJobs/graph/SimplePageRank.java b/flink-tests/src/test/java/org/apache/flink/test/recordJobs/graph/SimplePageRank.java
deleted file mode 100644
index 31d992f..0000000
--- a/flink-tests/src/test/java/org/apache/flink/test/recordJobs/graph/SimplePageRank.java
+++ /dev/null
@@ -1,194 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.test.recordJobs.graph;
-
-import java.io.Serializable;
-import java.util.Iterator;
-
-import org.apache.flink.api.common.Plan;
-import org.apache.flink.api.common.Program;
-import org.apache.flink.api.common.ProgramDescription;
-import org.apache.flink.api.java.record.functions.JoinFunction;
-import org.apache.flink.api.java.record.functions.ReduceFunction;
-import org.apache.flink.api.java.record.functions.FunctionAnnotation.ConstantFields;
-import org.apache.flink.api.java.record.operators.BulkIteration;
-import org.apache.flink.api.java.record.operators.FileDataSink;
-import org.apache.flink.api.java.record.operators.FileDataSource;
-import org.apache.flink.api.java.record.operators.JoinOperator;
-import org.apache.flink.api.java.record.operators.ReduceOperator;
-import org.apache.flink.api.java.record.operators.ReduceOperator.Combinable;
-import org.apache.flink.test.recordJobs.graph.pageRankUtil.DanglingPageRankInputFormat;
-import org.apache.flink.test.recordJobs.graph.pageRankUtil.ImprovedAdjacencyListInputFormat;
-import org.apache.flink.test.recordJobs.graph.pageRankUtil.LongArrayView;
-import org.apache.flink.test.recordJobs.graph.pageRankUtil.PageWithRankOutFormat;
-import org.apache.flink.types.DoubleValue;
-import org.apache.flink.types.IntValue;
-import org.apache.flink.types.LongValue;
-import org.apache.flink.types.Record;
-import org.apache.flink.util.Collector;
-
-@SuppressWarnings("deprecation")
-public class SimplePageRank implements Program, ProgramDescription {
-	
-	private static final long serialVersionUID = 1L;
-	
-	private static final String NUM_VERTICES_CONFIG_PARAM = "pageRank.numVertices";
-	
-	// --------------------------------------------------------------------------------------------
-
-	public static final class JoinVerexWithEdgesMatch extends JoinFunction implements Serializable {
-		private static final long serialVersionUID = 1L;
-
-		private Record record = new Record();
-		private LongValue vertexID = new LongValue();
-		private DoubleValue partialRank = new DoubleValue();
-		private DoubleValue rank = new DoubleValue();
-
-		private LongArrayView adjacentNeighbors = new LongArrayView();
-		
-		@Override
-		public void join(Record pageWithRank, Record edges, Collector<Record> out) throws Exception {
-			rank = pageWithRank.getField(1, rank);
-			adjacentNeighbors = edges.getField(1, adjacentNeighbors);
-			int numNeighbors = adjacentNeighbors.size();
-
-			double rankToDistribute = rank.getValue() / (double) numNeighbors;
-
-			partialRank.setValue(rankToDistribute);
-			record.setField(1, partialRank);
-			
-			for (int n = 0; n < numNeighbors; n++) {
-				vertexID.setValue(adjacentNeighbors.getQuick(n));
-				record.setField(0, vertexID);
-				out.collect(record);
-			}
-		}
-	}
-	
-	@Combinable
-	@ConstantFields(0)
-	public static final class AggregatingReduce extends ReduceFunction implements Serializable {
-		private static final long serialVersionUID = 1L;
-		
-		private final DoubleValue sum = new DoubleValue();
-
-		@Override
-		public void reduce(Iterator<Record> pageWithPartialRank, Collector<Record> out) throws Exception {
-			Record rec = null;
-			double rankSum = 0.0;
-			
-			while (pageWithPartialRank.hasNext()) {
-				rec = pageWithPartialRank.next();
-				rankSum += rec.getField(1, DoubleValue.class).getValue();
-			}
-			sum.setValue(rankSum);
-			
-			rec.setField(1, sum);
-			out.collect(rec);
-		}
-	}
-	
-	public static final class JoinOldAndNew extends JoinFunction implements Serializable {
-		private static final long serialVersionUID = 1L;
-
-		private Record record = new Record();
-		private LongValue vertexID = new LongValue();
-		private DoubleValue newRank = new DoubleValue();
-		private DoubleValue rank = new DoubleValue();
-		
-		@Override
-		public void join(Record pageWithRank, Record newPageWithRank, Collector<Record> out) throws Exception {
-			rank = pageWithRank.getField(1, rank);
-			newRank = newPageWithRank.getField(1, newRank);
-			vertexID = pageWithRank.getField(0, vertexID);
-			
-			double epsilon = 0.05;
-			double criterion = rank.getValue() - newRank.getValue();
-			
-			if(Math.abs(criterion) > epsilon)
-			{
-				record.setField(0, new IntValue(1));
-				out.collect(record);
-			}
-		}
-	}
-	
-	// --------------------------------------------------------------------------------------------
-	
-	public Plan getPlan(String ... args) {
-		int parallelism = 1;
-		String pageWithRankInputPath = "";
-		String adjacencyListInputPath = "";
-		String outputPath = "";
-		int numIterations = 25;
-		long numVertices = 5;
-
-		if (args.length >= 6) {
-			parallelism = Integer.parseInt(args[0]);
-			pageWithRankInputPath = args[1];
-			adjacencyListInputPath = args[2];
-			outputPath = args[3];
-			numIterations = Integer.parseInt(args[4]);
-			numVertices = Long.parseLong(args[5]);
-		}
-		
-		FileDataSource pageWithRankInput = new FileDataSource(new DanglingPageRankInputFormat(),
-			pageWithRankInputPath, "PageWithRank Input");
-		pageWithRankInput.getParameters().setLong(NUM_VERTICES_CONFIG_PARAM, numVertices);
-		
-		BulkIteration iteration = new BulkIteration("Page Rank Loop");
-		iteration.setInput(pageWithRankInput);
-		
-		FileDataSource adjacencyListInput = new FileDataSource(new ImprovedAdjacencyListInputFormat(),
-			adjacencyListInputPath, "AdjancencyListInput");
-		
-		JoinOperator join = JoinOperator.builder(new JoinVerexWithEdgesMatch(), LongValue.class, 0, 0)
-				.input1(iteration.getPartialSolution())
-				.input2(adjacencyListInput)
-				.name("Join with Edges")
-				.build();
-		
-		ReduceOperator rankAggregation = ReduceOperator.builder(new AggregatingReduce(), LongValue.class, 0)
-				.input(join)
-				.name("Rank Aggregation")
-				.build();
-		
-		iteration.setNextPartialSolution(rankAggregation);
-		iteration.setMaximumNumberOfIterations(numIterations);
-		
-		JoinOperator termination = JoinOperator.builder(new JoinOldAndNew(), LongValue.class, 0, 0)
-				.input1(iteration.getPartialSolution())
-				.input2(rankAggregation)
-				.name("Join Old and New")
-				.build();
-		
-		iteration.setTerminationCriterion(termination);
-		
-		FileDataSink out = new FileDataSink(new PageWithRankOutFormat(), outputPath, iteration, "Final Ranks");
-
-		Plan p = new Plan(out, "Simple PageRank");
-		p.setDefaultParallelism(parallelism);
-		return p;
-	}
-
-	@Override
-	public String getDescription() {
-		return "Parameters: <parallelism> <pages-input-path> <edges-input-path> <output-path> <max-iterations> <num-vertices> <num-dangling-vertices>";
-	}
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/a7a57ebe/flink-tests/src/test/java/org/apache/flink/test/recordJobs/graph/pageRankUtil/AsciiLongArrayView.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/recordJobs/graph/pageRankUtil/AsciiLongArrayView.java b/flink-tests/src/test/java/org/apache/flink/test/recordJobs/graph/pageRankUtil/AsciiLongArrayView.java
deleted file mode 100644
index 70d531e..0000000
--- a/flink-tests/src/test/java/org/apache/flink/test/recordJobs/graph/pageRankUtil/AsciiLongArrayView.java
+++ /dev/null
@@ -1,163 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-
-package org.apache.flink.test.recordJobs.graph.pageRankUtil;
-
-import java.io.Serializable;
-
-import com.google.common.base.Charsets;
-
-public class AsciiLongArrayView implements Serializable {
-	private static final long serialVersionUID = 1L;
-
-	private byte[] buffer;
-
-	private int offset;
-
-	private int numBytes;
-
-	private int tokenOffset;
-
-	private int tokenNumBytes;
-
-	private static final int NOT_SET = -1;
-
-	private static final int RADIX_TEN = 10;
-
-	private static final long MULTMIN_RADIX_TEN = Long.MIN_VALUE / 10;
-
-	private static final long N_MULTMAX_RADIX_TEN = -Long.MAX_VALUE / 10;
-
-	public void set(byte[] buffer, int offset, int numBytes) {
-		this.buffer = buffer;
-		this.offset = offset;
-		this.numBytes = numBytes;
-
-		this.tokenOffset = NOT_SET;
-		checkForSingleTrailingWhitespace();
-	}
-
-	private void checkForSingleTrailingWhitespace() {
-		if (Character.isWhitespace((char) buffer[offset + numBytes - 1])) {
-			numBytes--;
-		}
-	}
-
-	public int numElements() {
-		int matches = 0;
-		int pos = offset;
-		while (pos < offset + numBytes) {
-			if (Character.isWhitespace((char) buffer[pos])) {
-				matches++;
-			}
-			pos++;
-		}
-		return matches + 1;
-	}
-
-	public boolean next() {
-
-		if (tokenOffset == NOT_SET) {
-			tokenOffset = offset;
-		} else {
-			tokenOffset += tokenNumBytes + 1;
-			if (tokenOffset > offset + numBytes) {
-				return false;
-			}
-		}
-
-		tokenNumBytes = 1;
-		while (true) {
-			int candidatePos = tokenOffset + tokenNumBytes;
-			if (candidatePos >= offset + numBytes || Character.isWhitespace((char) buffer[candidatePos])) {
-				break;
-			}
-			tokenNumBytes++;
-		}
-
-		return true;
-	}
-
-	private char tokenCharAt(int pos) {
-		return (char) buffer[tokenOffset + pos];
-	}
-
-	public long element() {
-
-		long result = 0;
-		boolean negative = false;
-		int i = 0, max = tokenNumBytes;
-		long limit;
-		long multmin;
-		int digit;
-
-		if (max > 0) {
-			if (tokenCharAt(0) == '-') {
-				negative = true;
-				limit = Long.MIN_VALUE;
-				i++;
-			} else {
-				limit = -Long.MAX_VALUE;
-			}
-
-			multmin = negative ? MULTMIN_RADIX_TEN : N_MULTMAX_RADIX_TEN;
-
-			if (i < max) {
-				digit = Character.digit(tokenCharAt(i++), RADIX_TEN);
-				if (digit < 0) {
-					throw new NumberFormatException(toString());
-				} else {
-					result = -digit;
-				}
-			}
-			while (i < max) {
-				// Accumulating negatively avoids surprises near MAX_VALUE
-				digit = Character.digit(tokenCharAt(i++), RADIX_TEN);
-				if (digit < 0) {
-					throw new NumberFormatException(toString());
-				}
-				if (result < multmin) {
-					throw new NumberFormatException(toString());
-				}
-				result *= RADIX_TEN;
-				if (result < limit + digit) {
-					throw new NumberFormatException(toString());
-				}
-				result -= digit;
-			}
-		} else {
-			throw new NumberFormatException(toString());
-		}
-		if (negative) {
-			if (i > 1) {
-				return result;
-			} else { /* Only got "-" */
-				throw new NumberFormatException(toString());
-			}
-		} else {
-			return -result;
-		}
-	}
-
-	@Override
-	public String toString() {
-		return "[" + new String(buffer, offset, numBytes, Charsets.US_ASCII) + "] (buffer length: " + buffer.length +
-			", offset: " + offset + ", numBytes: " + numBytes + ")";
-	}
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/a7a57ebe/flink-tests/src/test/java/org/apache/flink/test/recordJobs/graph/pageRankUtil/DanglingPageRankInputFormat.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/recordJobs/graph/pageRankUtil/DanglingPageRankInputFormat.java b/flink-tests/src/test/java/org/apache/flink/test/recordJobs/graph/pageRankUtil/DanglingPageRankInputFormat.java
deleted file mode 100644
index 4a2b360..0000000
--- a/flink-tests/src/test/java/org/apache/flink/test/recordJobs/graph/pageRankUtil/DanglingPageRankInputFormat.java
+++ /dev/null
@@ -1,78 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-
-package org.apache.flink.test.recordJobs.graph.pageRankUtil;
-
-import org.apache.flink.api.java.record.io.TextInputFormat;
-import org.apache.flink.configuration.Configuration;
-import org.apache.flink.test.recordJobs.util.ConfigUtils;
-import org.apache.flink.types.BooleanValue;
-import org.apache.flink.types.DoubleValue;
-import org.apache.flink.types.LongValue;
-import org.apache.flink.types.Record;
-
-public class DanglingPageRankInputFormat extends TextInputFormat {
-	private static final long serialVersionUID = 1L;
-	
-	public static final String NUM_VERTICES_PARAMETER = "pageRank.numVertices";
-
-	private LongValue vertexID = new LongValue();
-
-	private DoubleValue initialRank;
-
-	private BooleanValue isDangling = new BooleanValue();
-
-	private AsciiLongArrayView arrayView = new AsciiLongArrayView();
-
-	private static final long DANGLING_MARKER = 1l;
-
-	@Override
-	public void configure(Configuration parameters) {
-		long numVertices = ConfigUtils.asLong(NUM_VERTICES_PARAMETER, parameters);
-		initialRank = new DoubleValue(1 / (double) numVertices);
-		super.configure(parameters);
-	}
-
-	@Override
-	public Record readRecord(Record target, byte[] bytes, int offset, int numBytes) {
-
-		arrayView.set(bytes, offset, numBytes);
-
-		try {
-			arrayView.next();
-			vertexID.setValue(arrayView.element());
-
-			if (arrayView.next()) {
-				isDangling.set(arrayView.element() == DANGLING_MARKER);
-			} else {
-				isDangling.set(false);
-			}
-
-		} catch (NumberFormatException e) {
-			throw new RuntimeException("Error parsing " + arrayView.toString(), e);
-		}
-
-		target.clear();
-		target.addField(vertexID);
-		target.addField(initialRank);
-		target.addField(isDangling);
-
-		return target;
-	}
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/a7a57ebe/flink-tests/src/test/java/org/apache/flink/test/recordJobs/graph/pageRankUtil/DiffL1NormConvergenceCriterion.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/recordJobs/graph/pageRankUtil/DiffL1NormConvergenceCriterion.java b/flink-tests/src/test/java/org/apache/flink/test/recordJobs/graph/pageRankUtil/DiffL1NormConvergenceCriterion.java
deleted file mode 100644
index 70280a0..0000000
--- a/flink-tests/src/test/java/org/apache/flink/test/recordJobs/graph/pageRankUtil/DiffL1NormConvergenceCriterion.java
+++ /dev/null
@@ -1,44 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-
-package org.apache.flink.test.recordJobs.graph.pageRankUtil;
-
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-import org.apache.flink.api.common.aggregators.ConvergenceCriterion;
-
-@SuppressWarnings("serial")
-public class DiffL1NormConvergenceCriterion implements ConvergenceCriterion<PageRankStats> {
-
-	private static final double EPSILON = 0.00005;
-
-	private static final Logger log = LoggerFactory.getLogger(DiffL1NormConvergenceCriterion.class);
-
-	@Override
-	public boolean isConverged(int iteration, PageRankStats pageRankStats) {
-		double diff = pageRankStats.diff();
-
-		if (log.isInfoEnabled()) {
-			log.info("Stats in iteration [" + iteration + "]: " + pageRankStats);
-			log.info("L1 norm of the vector difference is [" + diff + "] in iteration [" + iteration + "]");
-		}
-
-		return diff < EPSILON;
-	}
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/a7a57ebe/flink-tests/src/test/java/org/apache/flink/test/recordJobs/graph/pageRankUtil/DotProductCoGroup.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/recordJobs/graph/pageRankUtil/DotProductCoGroup.java b/flink-tests/src/test/java/org/apache/flink/test/recordJobs/graph/pageRankUtil/DotProductCoGroup.java
deleted file mode 100644
index d4f7a5c..0000000
--- a/flink-tests/src/test/java/org/apache/flink/test/recordJobs/graph/pageRankUtil/DotProductCoGroup.java
+++ /dev/null
@@ -1,129 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-
-package org.apache.flink.test.recordJobs.graph.pageRankUtil;
-
-import java.io.Serializable;
-import java.util.Iterator;
-
-import org.apache.flink.api.java.record.functions.CoGroupFunction;
-import org.apache.flink.api.java.record.functions.FunctionAnnotation.ConstantFieldsFirst;
-import org.apache.flink.configuration.Configuration;
-import org.apache.flink.test.recordJobs.util.ConfigUtils;
-import org.apache.flink.types.BooleanValue;
-import org.apache.flink.types.DoubleValue;
-import org.apache.flink.types.LongValue;
-import org.apache.flink.types.Record;
-import org.apache.flink.util.Collector;
-
-/**
- * In schema is_
- * INPUT = (pageId, currentRank, dangling), (pageId, partialRank).
- * OUTPUT = (pageId, newRank, dangling)
- */
-@SuppressWarnings("deprecation")
-@ConstantFieldsFirst(0)
-public class DotProductCoGroup extends CoGroupFunction implements Serializable {
-	private static final long serialVersionUID = 1L;
-	
-	public static final String NUM_VERTICES_PARAMETER = "pageRank.numVertices";
-	
-	public static final String NUM_DANGLING_VERTICES_PARAMETER = "pageRank.numDanglingVertices";
-	
-	public static final String AGGREGATOR_NAME = "pagerank.aggregator";
-	
-	private static final double BETA = 0.85;
-
-	
-	private PageRankStatsAggregator aggregator;
-
-	private long numVertices;
-
-	private long numDanglingVertices;
-
-	private double dampingFactor;
-
-	private double danglingRankFactor;
-	
-	
-	private Record accumulator = new Record();
-
-	private final DoubleValue newRank = new DoubleValue();
-
-	private BooleanValue isDangling = new BooleanValue();
-
-	private LongValue vertexID = new LongValue();
-
-	private DoubleValue doubleInstance = new DoubleValue();
-
-	@Override
-	public void open(Configuration parameters) throws Exception {
-		int currentIteration = getIterationRuntimeContext().getSuperstepNumber();
-		
-		numVertices = ConfigUtils.asLong(NUM_VERTICES_PARAMETER, parameters);
-		numDanglingVertices = ConfigUtils.asLong(NUM_DANGLING_VERTICES_PARAMETER, parameters);
-
-		dampingFactor = (1d - BETA) / (double) numVertices;
-		
-		aggregator = getIterationRuntimeContext().getIterationAggregator(AGGREGATOR_NAME);
-		
-		if (currentIteration == 1) {
-			danglingRankFactor = BETA * (double) numDanglingVertices / ((double) numVertices * (double) numVertices);
-		} else {
-			PageRankStats previousAggregate = getIterationRuntimeContext().getPreviousIterationAggregate(AGGREGATOR_NAME);
-			danglingRankFactor = BETA * previousAggregate.danglingRank() / (double) numVertices;
-		}
-	}
-
-	@Override
-	public void coGroup(Iterator<Record> currentPageRankIterator, Iterator<Record> partialRanks,
-			Collector<Record> collector)
-	{
-		if (!currentPageRankIterator.hasNext()) {
-			long missingVertex = partialRanks.next().getField(0, LongValue.class).getValue();
-			throw new IllegalStateException("No current page rank for vertex [" + missingVertex + "]!");
-		}
-
-		Record currentPageRank = currentPageRankIterator.next();
-
-		long edges = 0;
-		double summedRank = 0;
-		while (partialRanks.hasNext()) {
-			summedRank += partialRanks.next().getField(1, doubleInstance).getValue();
-			edges++;
-		}
-
-		double rank = BETA * summedRank + dampingFactor + danglingRankFactor;
-		double currentRank = currentPageRank.getField(1, doubleInstance).getValue();
-		isDangling = currentPageRank.getField(2, isDangling);
-		
-		// maintain statistics to compensate for probability loss on dangling nodes
-		double danglingRankToAggregate = isDangling.get() ? rank : 0;
-		long danglingVerticesToAggregate = isDangling.get() ? 1 : 0;
-		double diff = Math.abs(currentRank - rank);
-		aggregator.aggregate(diff, rank, danglingRankToAggregate, danglingVerticesToAggregate, 1, edges);
-		
-		// return the new record
-		newRank.setValue(rank);
-		accumulator.setField(0, currentPageRank.getField(0, vertexID));
-		accumulator.setField(1, newRank);
-		accumulator.setField(2, isDangling);
-		collector.collect(accumulator);
-	}
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/a7a57ebe/flink-tests/src/test/java/org/apache/flink/test/recordJobs/graph/pageRankUtil/DotProductMatch.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/recordJobs/graph/pageRankUtil/DotProductMatch.java b/flink-tests/src/test/java/org/apache/flink/test/recordJobs/graph/pageRankUtil/DotProductMatch.java
deleted file mode 100644
index 339cef5..0000000
--- a/flink-tests/src/test/java/org/apache/flink/test/recordJobs/graph/pageRankUtil/DotProductMatch.java
+++ /dev/null
@@ -1,63 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.test.recordJobs.graph.pageRankUtil;
-
-import java.io.Serializable;
-
-import org.apache.flink.api.java.record.functions.JoinFunction;
-import org.apache.flink.types.DoubleValue;
-import org.apache.flink.types.LongValue;
-import org.apache.flink.types.Record;
-import org.apache.flink.util.Collector;
-
-/**
- * In schema is_
- * INPUT = (pageId, rank, dangling), (pageId, neighbors-list).
- * OUTPUT = (targetPageId, partialRank)
- */
-@SuppressWarnings("deprecation")
-public class DotProductMatch extends JoinFunction implements Serializable {
-	private static final long serialVersionUID = 1L;
-
-	private Record record = new Record();
-	private LongValue vertexID = new LongValue();
-	private DoubleValue partialRank = new DoubleValue();
-	private DoubleValue rank = new DoubleValue();
-
-	private LongArrayView adjacentNeighbors = new LongArrayView();
-
-	@Override
-	public void join(Record pageWithRank, Record adjacencyList, Collector<Record> collector) throws Exception {
-
-		rank = pageWithRank.getField(1, rank);
-		adjacentNeighbors = adjacencyList.getField(1, adjacentNeighbors);
-		int numNeighbors = adjacentNeighbors.size();
-
-		double rankToDistribute = rank.getValue() / (double) numNeighbors;
-
-		partialRank.setValue(rankToDistribute);
-		record.setField(1, partialRank);
-
-		for (int n = 0; n < numNeighbors; n++) {
-			vertexID.setValue(adjacentNeighbors.getQuick(n));
-			record.setField(0, vertexID);
-			collector.collect(record);
-		}
-	}
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/a7a57ebe/flink-tests/src/test/java/org/apache/flink/test/recordJobs/graph/pageRankUtil/ImprovedAdjacencyListInputFormat.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/recordJobs/graph/pageRankUtil/ImprovedAdjacencyListInputFormat.java b/flink-tests/src/test/java/org/apache/flink/test/recordJobs/graph/pageRankUtil/ImprovedAdjacencyListInputFormat.java
deleted file mode 100644
index 6db4122..0000000
--- a/flink-tests/src/test/java/org/apache/flink/test/recordJobs/graph/pageRankUtil/ImprovedAdjacencyListInputFormat.java
+++ /dev/null
@@ -1,76 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-
-package org.apache.flink.test.recordJobs.graph.pageRankUtil;
-
-import org.apache.flink.api.java.record.io.TextInputFormat;
-import org.apache.flink.types.LongValue;
-import org.apache.flink.types.Record;
-
-public class ImprovedAdjacencyListInputFormat extends TextInputFormat {
-	private static final long serialVersionUID = 1L;
-
-	private final LongValue vertexID = new LongValue();
-
-	private final AsciiLongArrayView arrayView = new AsciiLongArrayView();
-
-	private final LongArrayView adjacentVertices = new LongArrayView();
-
-	@Override
-	public Record readRecord(Record target, byte[] bytes, int offset, int numBytes) {
-
-		if (numBytes == 0) {
-			return null;
-		}
-
-		arrayView.set(bytes, offset, numBytes);
-
-		int numElements = arrayView.numElements();
-		adjacentVertices.allocate(numElements - 1);
-
-		try {
-
-			int pos = 0;
-			while (arrayView.next()) {
-
-				if (pos == 0) {
-					vertexID.setValue(arrayView.element());
-				} else {
-					adjacentVertices.setQuick(pos - 1, arrayView.element());
-				}
-
-				pos++;
-			}
-
-			// sanity check
-			if (pos != numElements) {
-				throw new IllegalStateException("Should have gotten " + numElements + " elements, but saw " + pos);
-			}
-
-		} catch (RuntimeException e) {
-			throw new RuntimeException("Error parsing: " + arrayView.toString(), e);
-		}
-
-		target.clear();
-		target.addField(vertexID);
-		target.addField(adjacentVertices);
-
-		return target;
-	}
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/a7a57ebe/flink-tests/src/test/java/org/apache/flink/test/recordJobs/graph/pageRankUtil/LongArrayView.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/recordJobs/graph/pageRankUtil/LongArrayView.java b/flink-tests/src/test/java/org/apache/flink/test/recordJobs/graph/pageRankUtil/LongArrayView.java
deleted file mode 100644
index 770274b..0000000
--- a/flink-tests/src/test/java/org/apache/flink/test/recordJobs/graph/pageRankUtil/LongArrayView.java
+++ /dev/null
@@ -1,89 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-
-package org.apache.flink.test.recordJobs.graph.pageRankUtil;
-
-import java.io.IOException;
-
-import org.apache.flink.core.memory.DataInputView;
-import org.apache.flink.core.memory.DataOutputView;
-import org.apache.flink.types.Value;
-
-public class LongArrayView implements Value {
-	private static final long serialVersionUID = 1L;
-
-	private long[] entries = new long[0];
-
-	private int numEntries = 0;
-
-	public LongArrayView() {
-	}
-
-	public long get(int index) {
-		if (index >= numEntries) {
-			throw new ArrayIndexOutOfBoundsException();
-		}
-		return getQuick(index);
-	}
-
-	public long getQuick(int index) {
-		return entries[index];
-	}
-
-	public void allocate(int numEntries) {
-		this.numEntries = numEntries;
-		ensureCapacity();
-	}
-
-	public void set(int index, long value) {
-		if (index >= numEntries) {
-			throw new ArrayIndexOutOfBoundsException();
-		}
-		setQuick(index, value);
-	}
-
-	public void setQuick(int index, long value) {
-		entries[index] = value;
-	}
-
-	public int size() {
-		return numEntries;
-	}
-
-	private void ensureCapacity() {
-		if (entries.length < numEntries) {
-			entries = new long[numEntries];
-		}
-	}
-
-	public void write(DataOutputView out) throws IOException {
-		out.writeInt(numEntries);
-		for (int n = 0; n < numEntries; n++) {
-			out.writeLong(entries[n]);
-		}
-	}
-
-	public void read(DataInputView in) throws IOException {
-		numEntries = in.readInt();
-		ensureCapacity();
-		for (int n = 0; n < numEntries; n++) {
-			entries[n] = in.readLong();
-		}
-	}
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/a7a57ebe/flink-tests/src/test/java/org/apache/flink/test/recordJobs/graph/pageRankUtil/PageRankStats.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/recordJobs/graph/pageRankUtil/PageRankStats.java b/flink-tests/src/test/java/org/apache/flink/test/recordJobs/graph/pageRankUtil/PageRankStats.java
deleted file mode 100644
index c6d06f5..0000000
--- a/flink-tests/src/test/java/org/apache/flink/test/recordJobs/graph/pageRankUtil/PageRankStats.java
+++ /dev/null
@@ -1,105 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-
-package org.apache.flink.test.recordJobs.graph.pageRankUtil;
-
-import java.io.IOException;
-
-import org.apache.flink.core.memory.DataInputView;
-import org.apache.flink.core.memory.DataOutputView;
-import org.apache.flink.types.Value;
-
-public class PageRankStats implements Value {
-	private static final long serialVersionUID = 1L;
-
-	private double diff;
-
-	private double rank;
-
-	private double danglingRank;
-
-	private long numDanglingVertices;
-
-	private long numVertices;
-
-	private long edges;
-
-	public PageRankStats() {
-	}
-
-	public PageRankStats(double diff, double rank, double danglingRank, long numDanglingVertices, long numVertices, long edges) {
-		this.diff = diff;
-		this.rank = rank;
-		this.danglingRank = danglingRank;
-		this.numDanglingVertices = numDanglingVertices;
-		this.numVertices = numVertices;
-		this.edges = edges;
-	}
-
-	public double diff() {
-		return diff;
-	}
-
-	public double rank() {
-		return rank;
-	}
-
-	public double danglingRank() {
-		return danglingRank;
-	}
-
-	public long numDanglingVertices() {
-		return numDanglingVertices;
-	}
-
-	public long numVertices() {
-		return numVertices;
-	}
-
-	public long edges() {
-		return edges;
-	}
-
-	@Override
-	public void write(DataOutputView out) throws IOException {
-		out.writeDouble(diff);
-		out.writeDouble(rank);
-		out.writeDouble(danglingRank);
-		out.writeLong(numDanglingVertices);
-		out.writeLong(numVertices);
-		out.writeLong(edges);
-	}
-
-	@Override
-	public void read(DataInputView in) throws IOException {
-		diff = in.readDouble();
-		rank = in.readDouble();
-		danglingRank = in.readDouble();
-		numDanglingVertices = in.readLong();
-		numVertices = in.readLong();
-		edges = in.readLong();
-	}
-
-	@Override
-	public String toString() {
-		return "PageRankStats: diff [" + diff + "], rank [" + rank + "], danglingRank [" + danglingRank +
-			"], numDanglingVertices [" + numDanglingVertices + "], numVertices [" + numVertices + "], edges [" + edges +
-			"]";
-	}
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/a7a57ebe/flink-tests/src/test/java/org/apache/flink/test/recordJobs/graph/pageRankUtil/PageRankStatsAggregator.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/recordJobs/graph/pageRankUtil/PageRankStatsAggregator.java b/flink-tests/src/test/java/org/apache/flink/test/recordJobs/graph/pageRankUtil/PageRankStatsAggregator.java
deleted file mode 100644
index 1457493..0000000
--- a/flink-tests/src/test/java/org/apache/flink/test/recordJobs/graph/pageRankUtil/PageRankStatsAggregator.java
+++ /dev/null
@@ -1,73 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-
-package org.apache.flink.test.recordJobs.graph.pageRankUtil;
-
-import org.apache.flink.api.common.aggregators.Aggregator;
-
-@SuppressWarnings("serial")
-public class PageRankStatsAggregator implements Aggregator<PageRankStats> {
-
-	private double diff = 0;
-
-	private double rank = 0;
-
-	private double danglingRank = 0;
-
-	private long numDanglingVertices = 0;
-
-	private long numVertices = 0;
-
-	private long edges = 0;
-
-	@Override
-	public PageRankStats getAggregate() {
-		return new PageRankStats(diff, rank, danglingRank, numDanglingVertices, numVertices, edges);
-	}
-
-	public void aggregate(double diffDelta, double rankDelta, double danglingRankDelta, long danglingVerticesDelta,
-			long verticesDelta, long edgesDelta) {
-		diff += diffDelta;
-		rank += rankDelta;
-		danglingRank += danglingRankDelta;
-		numDanglingVertices += danglingVerticesDelta;
-		numVertices += verticesDelta;
-		edges += edgesDelta;
-	}
-
-	@Override
-	public void aggregate(PageRankStats pageRankStats) {
-		diff += pageRankStats.diff();
-		rank += pageRankStats.rank();
-		danglingRank += pageRankStats.danglingRank();
-		numDanglingVertices += pageRankStats.numDanglingVertices();
-		numVertices += pageRankStats.numVertices();
-		edges += pageRankStats.edges();
-	}
-
-	@Override
-	public void reset() {
-		diff = 0;
-		rank = 0;
-		danglingRank = 0;
-		numDanglingVertices = 0;
-		numVertices = 0;
-		edges = 0;
-	}
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/a7a57ebe/flink-tests/src/test/java/org/apache/flink/test/recordJobs/graph/pageRankUtil/PageWithRankOutFormat.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/recordJobs/graph/pageRankUtil/PageWithRankOutFormat.java b/flink-tests/src/test/java/org/apache/flink/test/recordJobs/graph/pageRankUtil/PageWithRankOutFormat.java
deleted file mode 100644
index 6c6dc42..0000000
--- a/flink-tests/src/test/java/org/apache/flink/test/recordJobs/graph/pageRankUtil/PageWithRankOutFormat.java
+++ /dev/null
@@ -1,51 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-
-package org.apache.flink.test.recordJobs.graph.pageRankUtil;
-
-import org.apache.flink.api.java.record.io.DelimitedOutputFormat;
-import org.apache.flink.types.DoubleValue;
-import org.apache.flink.types.LongValue;
-import org.apache.flink.types.Record;
-
-public class PageWithRankOutFormat extends DelimitedOutputFormat {
-	private static final long serialVersionUID = 1L;
-
-	private final StringBuilder buffer = new StringBuilder();
-
-	@Override
-	public int serializeRecord(Record record, byte[] target) {
-		StringBuilder buffer = this.buffer;
-		
-		buffer.setLength(0);
-		buffer.append(record.getField(0, LongValue.class).toString());
-		buffer.append('\t');
-		buffer.append(record.getField(1, DoubleValue.class).toString());
-		buffer.append('\n');
-		
-		if (target.length < buffer.length()) {
-			return -buffer.length();
-		}
-		
-		for (int i = 0; i < buffer.length(); i++) {
-			target[i] = (byte) buffer.charAt(i);
-		}
-		return buffer.length();
-	}
-}