You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by se...@apache.org on 2015/08/30 23:44:37 UTC
[2/5] flink git commit: [FLINK-1681] [tests] Remove outdated
'nephele' iteration tests.
http://git-wip-us.apache.org/repos/asf/flink/blob/a7a57ebe/flink-tests/src/test/java/org/apache/flink/test/iterative/nephele/danglingpagerank/PageRankStats.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/iterative/nephele/danglingpagerank/PageRankStats.java b/flink-tests/src/test/java/org/apache/flink/test/iterative/nephele/danglingpagerank/PageRankStats.java
deleted file mode 100644
index e093a48..0000000
--- a/flink-tests/src/test/java/org/apache/flink/test/iterative/nephele/danglingpagerank/PageRankStats.java
+++ /dev/null
@@ -1,124 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-
-package org.apache.flink.test.iterative.nephele.danglingpagerank;
-
-import java.io.IOException;
-
-import org.apache.flink.core.memory.DataInputView;
-import org.apache.flink.core.memory.DataOutputView;
-import org.apache.flink.types.Value;
-
-public class PageRankStats implements Value {
- private static final long serialVersionUID = 1L;
-
- private double diff;
-
- private double rank;
-
- private double danglingRank;
-
- private long numDanglingVertices;
-
- private long numVertices;
-
- private long edges;
-
- private double summedRank;
-
- private double finalDiff;
-
- public PageRankStats() {
- }
-
- public PageRankStats(double diff, double rank, double danglingRank, long numDanglingVertices, long numVertices,
- long edges, double summedRank, double finalDiff) {
- this.diff = diff;
- this.rank = rank;
- this.danglingRank = danglingRank;
- this.numDanglingVertices = numDanglingVertices;
- this.numVertices = numVertices;
- this.edges = edges;
- this.summedRank = summedRank;
- this.finalDiff = finalDiff;
- }
-
- public double diff() {
- return diff;
- }
-
- public double rank() {
- return rank;
- }
-
- public double danglingRank() {
- return danglingRank;
- }
-
- public long numDanglingVertices() {
- return numDanglingVertices;
- }
-
- public long numVertices() {
- return numVertices;
- }
-
- public long edges() {
- return edges;
- }
-
- public double summedRank() {
- return summedRank;
- }
-
- public double finalDiff() {
- return finalDiff;
- }
-
- @Override
- public void write(DataOutputView out) throws IOException {
- out.writeDouble(diff);
- out.writeDouble(rank);
- out.writeDouble(danglingRank);
- out.writeLong(numDanglingVertices);
- out.writeLong(numVertices);
- out.writeLong(edges);
- out.writeDouble(summedRank);
- out.writeDouble(finalDiff);
- }
-
- @Override
- public void read(DataInputView in) throws IOException {
- diff = in.readDouble();
- rank = in.readDouble();
- danglingRank = in.readDouble();
- numDanglingVertices = in.readLong();
- numVertices = in.readLong();
- edges = in.readLong();
- summedRank = in.readDouble();
- finalDiff = in.readDouble();
- }
-
- @Override
- public String toString() {
- return "PageRankStats: diff [" + diff + "], rank [" + rank + "], danglingRank [" + danglingRank +
- "], numDanglingVertices [" + numDanglingVertices + "], numVertices [" + numVertices + "], edges [" + edges +
- "], summedRank [" + summedRank + "], finalDiff [" + finalDiff + "]";
- }
-}
http://git-wip-us.apache.org/repos/asf/flink/blob/a7a57ebe/flink-tests/src/test/java/org/apache/flink/test/iterative/nephele/danglingpagerank/PageRankStatsAggregator.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/iterative/nephele/danglingpagerank/PageRankStatsAggregator.java b/flink-tests/src/test/java/org/apache/flink/test/iterative/nephele/danglingpagerank/PageRankStatsAggregator.java
deleted file mode 100644
index 4b41e45..0000000
--- a/flink-tests/src/test/java/org/apache/flink/test/iterative/nephele/danglingpagerank/PageRankStatsAggregator.java
+++ /dev/null
@@ -1,84 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-
-package org.apache.flink.test.iterative.nephele.danglingpagerank;
-
-import org.apache.flink.api.common.aggregators.Aggregator;
-
-@SuppressWarnings("serial")
-public class PageRankStatsAggregator implements Aggregator<PageRankStats> {
-
- private double diff = 0;
-
- private double rank = 0;
-
- private double danglingRank = 0;
-
- private long numDanglingVertices = 0;
-
- private long numVertices = 0;
-
- private long edges = 0;
-
- private double summedRank = 0;
-
- private double finalDiff = 0;
-
- @Override
- public PageRankStats getAggregate() {
- return new PageRankStats(diff, rank, danglingRank, numDanglingVertices, numVertices, edges, summedRank,
- finalDiff);
- }
-
- public void aggregate(double diffDelta, double rankDelta, double danglingRankDelta, long danglingVerticesDelta,
- long verticesDelta, long edgesDelta, double summedRankDelta, double finalDiffDelta) {
- diff += diffDelta;
- rank += rankDelta;
- danglingRank += danglingRankDelta;
- numDanglingVertices += danglingVerticesDelta;
- numVertices += verticesDelta;
- edges += edgesDelta;
- summedRank += summedRankDelta;
- finalDiff += finalDiffDelta;
- }
-
- @Override
- public void aggregate(PageRankStats pageRankStats) {
- diff += pageRankStats.diff();
- rank += pageRankStats.rank();
- danglingRank += pageRankStats.danglingRank();
- numDanglingVertices += pageRankStats.numDanglingVertices();
- numVertices += pageRankStats.numVertices();
- edges += pageRankStats.edges();
- summedRank += pageRankStats.summedRank();
- finalDiff += pageRankStats.finalDiff();
- }
-
- @Override
- public void reset() {
- diff = 0;
- rank = 0;
- danglingRank = 0;
- numDanglingVertices = 0;
- numVertices = 0;
- edges = 0;
- summedRank = 0;
- finalDiff = 0;
- }
-}
http://git-wip-us.apache.org/repos/asf/flink/blob/a7a57ebe/flink-tests/src/test/java/org/apache/flink/test/iterative/nephele/danglingpagerank/PageWithRankOutFormat.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/iterative/nephele/danglingpagerank/PageWithRankOutFormat.java b/flink-tests/src/test/java/org/apache/flink/test/iterative/nephele/danglingpagerank/PageWithRankOutFormat.java
deleted file mode 100644
index 5cd520f..0000000
--- a/flink-tests/src/test/java/org/apache/flink/test/iterative/nephele/danglingpagerank/PageWithRankOutFormat.java
+++ /dev/null
@@ -1,47 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-
-package org.apache.flink.test.iterative.nephele.danglingpagerank;
-
-import com.google.common.base.Charsets;
-
-import java.io.IOException;
-
-import org.apache.flink.api.java.record.io.FileOutputFormat;
-import org.apache.flink.types.DoubleValue;
-import org.apache.flink.types.LongValue;
-import org.apache.flink.types.Record;
-
-public class PageWithRankOutFormat extends FileOutputFormat {
- private static final long serialVersionUID = 1L;
-
- private final StringBuilder buffer = new StringBuilder();
-
- @Override
- public void writeRecord(Record record) throws IOException {
- buffer.setLength(0);
- buffer.append(record.getField(0, LongValue.class).toString());
- buffer.append('\t');
- buffer.append(record.getField(1, DoubleValue.class).toString());
- buffer.append('\n');
-
- byte[] bytes = buffer.toString().getBytes(Charsets.UTF_8);
- stream.write(bytes);
- }
-}
http://git-wip-us.apache.org/repos/asf/flink/blob/a7a57ebe/flink-tests/src/test/java/org/apache/flink/test/recordJobs/graph/DanglingPageRank.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/recordJobs/graph/DanglingPageRank.java b/flink-tests/src/test/java/org/apache/flink/test/recordJobs/graph/DanglingPageRank.java
deleted file mode 100644
index 80ba91a..0000000
--- a/flink-tests/src/test/java/org/apache/flink/test/recordJobs/graph/DanglingPageRank.java
+++ /dev/null
@@ -1,105 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-
-package org.apache.flink.test.recordJobs.graph;
-
-import org.apache.flink.api.common.Plan;
-import org.apache.flink.api.common.Program;
-import org.apache.flink.api.common.ProgramDescription;
-import org.apache.flink.api.java.record.operators.BulkIteration;
-import org.apache.flink.api.java.record.operators.CoGroupOperator;
-import org.apache.flink.api.java.record.operators.FileDataSink;
-import org.apache.flink.api.java.record.operators.FileDataSource;
-import org.apache.flink.api.java.record.operators.JoinOperator;
-import org.apache.flink.test.recordJobs.graph.pageRankUtil.DanglingPageRankInputFormat;
-import org.apache.flink.test.recordJobs.graph.pageRankUtil.DiffL1NormConvergenceCriterion;
-import org.apache.flink.test.recordJobs.graph.pageRankUtil.DotProductCoGroup;
-import org.apache.flink.test.recordJobs.graph.pageRankUtil.DotProductMatch;
-import org.apache.flink.test.recordJobs.graph.pageRankUtil.ImprovedAdjacencyListInputFormat;
-import org.apache.flink.test.recordJobs.graph.pageRankUtil.PageRankStatsAggregator;
-import org.apache.flink.test.recordJobs.graph.pageRankUtil.PageWithRankOutFormat;
-import org.apache.flink.types.LongValue;
-
-@SuppressWarnings("deprecation")
-public class DanglingPageRank implements Program, ProgramDescription {
-
- private static final long serialVersionUID = 1L;
-
- public static final String NUM_VERTICES_CONFIG_PARAM = "pageRank.numVertices";
-
- public Plan getPlan(String ... args) {
- int parallelism = 1;
- String pageWithRankInputPath = "";
- String adjacencyListInputPath = "";
- String outputPath = "";
- int numIterations = 25;
- long numVertices = 5;
- long numDanglingVertices = 1;
-
- if (args.length >= 7) {
- parallelism = Integer.parseInt(args[0]);
- pageWithRankInputPath = args[1];
- adjacencyListInputPath = args[2];
- outputPath = args[3];
- numIterations = Integer.parseInt(args[4]);
- numVertices = Long.parseLong(args[5]);
- numDanglingVertices = Long.parseLong(args[6]);
- }
-
- FileDataSource pageWithRankInput = new FileDataSource(new DanglingPageRankInputFormat(),
- pageWithRankInputPath, "DanglingPageWithRankInput");
- pageWithRankInput.getParameters().setLong(DanglingPageRankInputFormat.NUM_VERTICES_PARAMETER, numVertices);
-
- BulkIteration iteration = new BulkIteration("Page Rank Loop");
- iteration.setInput(pageWithRankInput);
-
- FileDataSource adjacencyListInput = new FileDataSource(new ImprovedAdjacencyListInputFormat(),
- adjacencyListInputPath, "AdjancencyListInput");
-
- JoinOperator join = JoinOperator.builder(new DotProductMatch(), LongValue.class, 0, 0)
- .input1(iteration.getPartialSolution())
- .input2(adjacencyListInput)
- .name("Join with Edges")
- .build();
-
- CoGroupOperator rankAggregation = CoGroupOperator.builder(new DotProductCoGroup(), LongValue.class, 0, 0)
- .input1(iteration.getPartialSolution())
- .input2(join)
- .name("Rank Aggregation")
- .build();
- rankAggregation.getParameters().setLong(DotProductCoGroup.NUM_VERTICES_PARAMETER, numVertices);
- rankAggregation.getParameters().setLong(DotProductCoGroup.NUM_DANGLING_VERTICES_PARAMETER, numDanglingVertices);
-
- iteration.setNextPartialSolution(rankAggregation);
- iteration.setMaximumNumberOfIterations(numIterations);
- iteration.getAggregators().registerAggregationConvergenceCriterion(DotProductCoGroup.AGGREGATOR_NAME, new PageRankStatsAggregator(),
- new DiffL1NormConvergenceCriterion());
-
- FileDataSink out = new FileDataSink(new PageWithRankOutFormat(), outputPath, iteration, "Final Ranks");
-
- Plan p = new Plan(out, "Dangling PageRank");
- p.setDefaultParallelism(parallelism);
- return p;
- }
-
- @Override
- public String getDescription() {
- return "Parameters: <parallelism> <pages-input-path> <edges-input-path> <output-path> <max-iterations> <num-vertices> <num-dangling-vertices>";
- }
-}
http://git-wip-us.apache.org/repos/asf/flink/blob/a7a57ebe/flink-tests/src/test/java/org/apache/flink/test/recordJobs/graph/SimplePageRank.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/recordJobs/graph/SimplePageRank.java b/flink-tests/src/test/java/org/apache/flink/test/recordJobs/graph/SimplePageRank.java
deleted file mode 100644
index 31d992f..0000000
--- a/flink-tests/src/test/java/org/apache/flink/test/recordJobs/graph/SimplePageRank.java
+++ /dev/null
@@ -1,194 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.test.recordJobs.graph;
-
-import java.io.Serializable;
-import java.util.Iterator;
-
-import org.apache.flink.api.common.Plan;
-import org.apache.flink.api.common.Program;
-import org.apache.flink.api.common.ProgramDescription;
-import org.apache.flink.api.java.record.functions.JoinFunction;
-import org.apache.flink.api.java.record.functions.ReduceFunction;
-import org.apache.flink.api.java.record.functions.FunctionAnnotation.ConstantFields;
-import org.apache.flink.api.java.record.operators.BulkIteration;
-import org.apache.flink.api.java.record.operators.FileDataSink;
-import org.apache.flink.api.java.record.operators.FileDataSource;
-import org.apache.flink.api.java.record.operators.JoinOperator;
-import org.apache.flink.api.java.record.operators.ReduceOperator;
-import org.apache.flink.api.java.record.operators.ReduceOperator.Combinable;
-import org.apache.flink.test.recordJobs.graph.pageRankUtil.DanglingPageRankInputFormat;
-import org.apache.flink.test.recordJobs.graph.pageRankUtil.ImprovedAdjacencyListInputFormat;
-import org.apache.flink.test.recordJobs.graph.pageRankUtil.LongArrayView;
-import org.apache.flink.test.recordJobs.graph.pageRankUtil.PageWithRankOutFormat;
-import org.apache.flink.types.DoubleValue;
-import org.apache.flink.types.IntValue;
-import org.apache.flink.types.LongValue;
-import org.apache.flink.types.Record;
-import org.apache.flink.util.Collector;
-
-@SuppressWarnings("deprecation")
-public class SimplePageRank implements Program, ProgramDescription {
-
- private static final long serialVersionUID = 1L;
-
- private static final String NUM_VERTICES_CONFIG_PARAM = "pageRank.numVertices";
-
- // --------------------------------------------------------------------------------------------
-
- public static final class JoinVerexWithEdgesMatch extends JoinFunction implements Serializable {
- private static final long serialVersionUID = 1L;
-
- private Record record = new Record();
- private LongValue vertexID = new LongValue();
- private DoubleValue partialRank = new DoubleValue();
- private DoubleValue rank = new DoubleValue();
-
- private LongArrayView adjacentNeighbors = new LongArrayView();
-
- @Override
- public void join(Record pageWithRank, Record edges, Collector<Record> out) throws Exception {
- rank = pageWithRank.getField(1, rank);
- adjacentNeighbors = edges.getField(1, adjacentNeighbors);
- int numNeighbors = adjacentNeighbors.size();
-
- double rankToDistribute = rank.getValue() / (double) numNeighbors;
-
- partialRank.setValue(rankToDistribute);
- record.setField(1, partialRank);
-
- for (int n = 0; n < numNeighbors; n++) {
- vertexID.setValue(adjacentNeighbors.getQuick(n));
- record.setField(0, vertexID);
- out.collect(record);
- }
- }
- }
-
- @Combinable
- @ConstantFields(0)
- public static final class AggregatingReduce extends ReduceFunction implements Serializable {
- private static final long serialVersionUID = 1L;
-
- private final DoubleValue sum = new DoubleValue();
-
- @Override
- public void reduce(Iterator<Record> pageWithPartialRank, Collector<Record> out) throws Exception {
- Record rec = null;
- double rankSum = 0.0;
-
- while (pageWithPartialRank.hasNext()) {
- rec = pageWithPartialRank.next();
- rankSum += rec.getField(1, DoubleValue.class).getValue();
- }
- sum.setValue(rankSum);
-
- rec.setField(1, sum);
- out.collect(rec);
- }
- }
-
- public static final class JoinOldAndNew extends JoinFunction implements Serializable {
- private static final long serialVersionUID = 1L;
-
- private Record record = new Record();
- private LongValue vertexID = new LongValue();
- private DoubleValue newRank = new DoubleValue();
- private DoubleValue rank = new DoubleValue();
-
- @Override
- public void join(Record pageWithRank, Record newPageWithRank, Collector<Record> out) throws Exception {
- rank = pageWithRank.getField(1, rank);
- newRank = newPageWithRank.getField(1, newRank);
- vertexID = pageWithRank.getField(0, vertexID);
-
- double epsilon = 0.05;
- double criterion = rank.getValue() - newRank.getValue();
-
- if(Math.abs(criterion) > epsilon)
- {
- record.setField(0, new IntValue(1));
- out.collect(record);
- }
- }
- }
-
- // --------------------------------------------------------------------------------------------
-
- public Plan getPlan(String ... args) {
- int parallelism = 1;
- String pageWithRankInputPath = "";
- String adjacencyListInputPath = "";
- String outputPath = "";
- int numIterations = 25;
- long numVertices = 5;
-
- if (args.length >= 6) {
- parallelism = Integer.parseInt(args[0]);
- pageWithRankInputPath = args[1];
- adjacencyListInputPath = args[2];
- outputPath = args[3];
- numIterations = Integer.parseInt(args[4]);
- numVertices = Long.parseLong(args[5]);
- }
-
- FileDataSource pageWithRankInput = new FileDataSource(new DanglingPageRankInputFormat(),
- pageWithRankInputPath, "PageWithRank Input");
- pageWithRankInput.getParameters().setLong(NUM_VERTICES_CONFIG_PARAM, numVertices);
-
- BulkIteration iteration = new BulkIteration("Page Rank Loop");
- iteration.setInput(pageWithRankInput);
-
- FileDataSource adjacencyListInput = new FileDataSource(new ImprovedAdjacencyListInputFormat(),
- adjacencyListInputPath, "AdjancencyListInput");
-
- JoinOperator join = JoinOperator.builder(new JoinVerexWithEdgesMatch(), LongValue.class, 0, 0)
- .input1(iteration.getPartialSolution())
- .input2(adjacencyListInput)
- .name("Join with Edges")
- .build();
-
- ReduceOperator rankAggregation = ReduceOperator.builder(new AggregatingReduce(), LongValue.class, 0)
- .input(join)
- .name("Rank Aggregation")
- .build();
-
- iteration.setNextPartialSolution(rankAggregation);
- iteration.setMaximumNumberOfIterations(numIterations);
-
- JoinOperator termination = JoinOperator.builder(new JoinOldAndNew(), LongValue.class, 0, 0)
- .input1(iteration.getPartialSolution())
- .input2(rankAggregation)
- .name("Join Old and New")
- .build();
-
- iteration.setTerminationCriterion(termination);
-
- FileDataSink out = new FileDataSink(new PageWithRankOutFormat(), outputPath, iteration, "Final Ranks");
-
- Plan p = new Plan(out, "Simple PageRank");
- p.setDefaultParallelism(parallelism);
- return p;
- }
-
- @Override
- public String getDescription() {
- return "Parameters: <parallelism> <pages-input-path> <edges-input-path> <output-path> <max-iterations> <num-vertices> <num-dangling-vertices>";
- }
-}
http://git-wip-us.apache.org/repos/asf/flink/blob/a7a57ebe/flink-tests/src/test/java/org/apache/flink/test/recordJobs/graph/pageRankUtil/AsciiLongArrayView.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/recordJobs/graph/pageRankUtil/AsciiLongArrayView.java b/flink-tests/src/test/java/org/apache/flink/test/recordJobs/graph/pageRankUtil/AsciiLongArrayView.java
deleted file mode 100644
index 70d531e..0000000
--- a/flink-tests/src/test/java/org/apache/flink/test/recordJobs/graph/pageRankUtil/AsciiLongArrayView.java
+++ /dev/null
@@ -1,163 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-
-package org.apache.flink.test.recordJobs.graph.pageRankUtil;
-
-import java.io.Serializable;
-
-import com.google.common.base.Charsets;
-
-public class AsciiLongArrayView implements Serializable {
- private static final long serialVersionUID = 1L;
-
- private byte[] buffer;
-
- private int offset;
-
- private int numBytes;
-
- private int tokenOffset;
-
- private int tokenNumBytes;
-
- private static final int NOT_SET = -1;
-
- private static final int RADIX_TEN = 10;
-
- private static final long MULTMIN_RADIX_TEN = Long.MIN_VALUE / 10;
-
- private static final long N_MULTMAX_RADIX_TEN = -Long.MAX_VALUE / 10;
-
- public void set(byte[] buffer, int offset, int numBytes) {
- this.buffer = buffer;
- this.offset = offset;
- this.numBytes = numBytes;
-
- this.tokenOffset = NOT_SET;
- checkForSingleTrailingWhitespace();
- }
-
- private void checkForSingleTrailingWhitespace() {
- if (Character.isWhitespace((char) buffer[offset + numBytes - 1])) {
- numBytes--;
- }
- }
-
- public int numElements() {
- int matches = 0;
- int pos = offset;
- while (pos < offset + numBytes) {
- if (Character.isWhitespace((char) buffer[pos])) {
- matches++;
- }
- pos++;
- }
- return matches + 1;
- }
-
- public boolean next() {
-
- if (tokenOffset == NOT_SET) {
- tokenOffset = offset;
- } else {
- tokenOffset += tokenNumBytes + 1;
- if (tokenOffset > offset + numBytes) {
- return false;
- }
- }
-
- tokenNumBytes = 1;
- while (true) {
- int candidatePos = tokenOffset + tokenNumBytes;
- if (candidatePos >= offset + numBytes || Character.isWhitespace((char) buffer[candidatePos])) {
- break;
- }
- tokenNumBytes++;
- }
-
- return true;
- }
-
- private char tokenCharAt(int pos) {
- return (char) buffer[tokenOffset + pos];
- }
-
- public long element() {
-
- long result = 0;
- boolean negative = false;
- int i = 0, max = tokenNumBytes;
- long limit;
- long multmin;
- int digit;
-
- if (max > 0) {
- if (tokenCharAt(0) == '-') {
- negative = true;
- limit = Long.MIN_VALUE;
- i++;
- } else {
- limit = -Long.MAX_VALUE;
- }
-
- multmin = negative ? MULTMIN_RADIX_TEN : N_MULTMAX_RADIX_TEN;
-
- if (i < max) {
- digit = Character.digit(tokenCharAt(i++), RADIX_TEN);
- if (digit < 0) {
- throw new NumberFormatException(toString());
- } else {
- result = -digit;
- }
- }
- while (i < max) {
- // Accumulating negatively avoids surprises near MAX_VALUE
- digit = Character.digit(tokenCharAt(i++), RADIX_TEN);
- if (digit < 0) {
- throw new NumberFormatException(toString());
- }
- if (result < multmin) {
- throw new NumberFormatException(toString());
- }
- result *= RADIX_TEN;
- if (result < limit + digit) {
- throw new NumberFormatException(toString());
- }
- result -= digit;
- }
- } else {
- throw new NumberFormatException(toString());
- }
- if (negative) {
- if (i > 1) {
- return result;
- } else { /* Only got "-" */
- throw new NumberFormatException(toString());
- }
- } else {
- return -result;
- }
- }
-
- @Override
- public String toString() {
- return "[" + new String(buffer, offset, numBytes, Charsets.US_ASCII) + "] (buffer length: " + buffer.length +
- ", offset: " + offset + ", numBytes: " + numBytes + ")";
- }
-}
http://git-wip-us.apache.org/repos/asf/flink/blob/a7a57ebe/flink-tests/src/test/java/org/apache/flink/test/recordJobs/graph/pageRankUtil/DanglingPageRankInputFormat.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/recordJobs/graph/pageRankUtil/DanglingPageRankInputFormat.java b/flink-tests/src/test/java/org/apache/flink/test/recordJobs/graph/pageRankUtil/DanglingPageRankInputFormat.java
deleted file mode 100644
index 4a2b360..0000000
--- a/flink-tests/src/test/java/org/apache/flink/test/recordJobs/graph/pageRankUtil/DanglingPageRankInputFormat.java
+++ /dev/null
@@ -1,78 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-
-package org.apache.flink.test.recordJobs.graph.pageRankUtil;
-
-import org.apache.flink.api.java.record.io.TextInputFormat;
-import org.apache.flink.configuration.Configuration;
-import org.apache.flink.test.recordJobs.util.ConfigUtils;
-import org.apache.flink.types.BooleanValue;
-import org.apache.flink.types.DoubleValue;
-import org.apache.flink.types.LongValue;
-import org.apache.flink.types.Record;
-
-public class DanglingPageRankInputFormat extends TextInputFormat {
- private static final long serialVersionUID = 1L;
-
- public static final String NUM_VERTICES_PARAMETER = "pageRank.numVertices";
-
- private LongValue vertexID = new LongValue();
-
- private DoubleValue initialRank;
-
- private BooleanValue isDangling = new BooleanValue();
-
- private AsciiLongArrayView arrayView = new AsciiLongArrayView();
-
- private static final long DANGLING_MARKER = 1l;
-
- @Override
- public void configure(Configuration parameters) {
- long numVertices = ConfigUtils.asLong(NUM_VERTICES_PARAMETER, parameters);
- initialRank = new DoubleValue(1 / (double) numVertices);
- super.configure(parameters);
- }
-
- @Override
- public Record readRecord(Record target, byte[] bytes, int offset, int numBytes) {
-
- arrayView.set(bytes, offset, numBytes);
-
- try {
- arrayView.next();
- vertexID.setValue(arrayView.element());
-
- if (arrayView.next()) {
- isDangling.set(arrayView.element() == DANGLING_MARKER);
- } else {
- isDangling.set(false);
- }
-
- } catch (NumberFormatException e) {
- throw new RuntimeException("Error parsing " + arrayView.toString(), e);
- }
-
- target.clear();
- target.addField(vertexID);
- target.addField(initialRank);
- target.addField(isDangling);
-
- return target;
- }
-}
http://git-wip-us.apache.org/repos/asf/flink/blob/a7a57ebe/flink-tests/src/test/java/org/apache/flink/test/recordJobs/graph/pageRankUtil/DiffL1NormConvergenceCriterion.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/recordJobs/graph/pageRankUtil/DiffL1NormConvergenceCriterion.java b/flink-tests/src/test/java/org/apache/flink/test/recordJobs/graph/pageRankUtil/DiffL1NormConvergenceCriterion.java
deleted file mode 100644
index 70280a0..0000000
--- a/flink-tests/src/test/java/org/apache/flink/test/recordJobs/graph/pageRankUtil/DiffL1NormConvergenceCriterion.java
+++ /dev/null
@@ -1,44 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-
-package org.apache.flink.test.recordJobs.graph.pageRankUtil;
-
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-import org.apache.flink.api.common.aggregators.ConvergenceCriterion;
-
-@SuppressWarnings("serial")
-public class DiffL1NormConvergenceCriterion implements ConvergenceCriterion<PageRankStats> {
-
- private static final double EPSILON = 0.00005;
-
- private static final Logger log = LoggerFactory.getLogger(DiffL1NormConvergenceCriterion.class);
-
- @Override
- public boolean isConverged(int iteration, PageRankStats pageRankStats) {
- double diff = pageRankStats.diff();
-
- if (log.isInfoEnabled()) {
- log.info("Stats in iteration [" + iteration + "]: " + pageRankStats);
- log.info("L1 norm of the vector difference is [" + diff + "] in iteration [" + iteration + "]");
- }
-
- return diff < EPSILON;
- }
-}
http://git-wip-us.apache.org/repos/asf/flink/blob/a7a57ebe/flink-tests/src/test/java/org/apache/flink/test/recordJobs/graph/pageRankUtil/DotProductCoGroup.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/recordJobs/graph/pageRankUtil/DotProductCoGroup.java b/flink-tests/src/test/java/org/apache/flink/test/recordJobs/graph/pageRankUtil/DotProductCoGroup.java
deleted file mode 100644
index d4f7a5c..0000000
--- a/flink-tests/src/test/java/org/apache/flink/test/recordJobs/graph/pageRankUtil/DotProductCoGroup.java
+++ /dev/null
@@ -1,129 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-
-package org.apache.flink.test.recordJobs.graph.pageRankUtil;
-
-import java.io.Serializable;
-import java.util.Iterator;
-
-import org.apache.flink.api.java.record.functions.CoGroupFunction;
-import org.apache.flink.api.java.record.functions.FunctionAnnotation.ConstantFieldsFirst;
-import org.apache.flink.configuration.Configuration;
-import org.apache.flink.test.recordJobs.util.ConfigUtils;
-import org.apache.flink.types.BooleanValue;
-import org.apache.flink.types.DoubleValue;
-import org.apache.flink.types.LongValue;
-import org.apache.flink.types.Record;
-import org.apache.flink.util.Collector;
-
-/**
- * In schema is_
- * INPUT = (pageId, currentRank, dangling), (pageId, partialRank).
- * OUTPUT = (pageId, newRank, dangling)
- */
-@SuppressWarnings("deprecation")
-@ConstantFieldsFirst(0)
-public class DotProductCoGroup extends CoGroupFunction implements Serializable {
- private static final long serialVersionUID = 1L;
-
- public static final String NUM_VERTICES_PARAMETER = "pageRank.numVertices";
-
- public static final String NUM_DANGLING_VERTICES_PARAMETER = "pageRank.numDanglingVertices";
-
- public static final String AGGREGATOR_NAME = "pagerank.aggregator";
-
- private static final double BETA = 0.85;
-
-
- private PageRankStatsAggregator aggregator;
-
- private long numVertices;
-
- private long numDanglingVertices;
-
- private double dampingFactor;
-
- private double danglingRankFactor;
-
-
- private Record accumulator = new Record();
-
- private final DoubleValue newRank = new DoubleValue();
-
- private BooleanValue isDangling = new BooleanValue();
-
- private LongValue vertexID = new LongValue();
-
- private DoubleValue doubleInstance = new DoubleValue();
-
- @Override
- public void open(Configuration parameters) throws Exception {
- int currentIteration = getIterationRuntimeContext().getSuperstepNumber();
-
- numVertices = ConfigUtils.asLong(NUM_VERTICES_PARAMETER, parameters);
- numDanglingVertices = ConfigUtils.asLong(NUM_DANGLING_VERTICES_PARAMETER, parameters);
-
- dampingFactor = (1d - BETA) / (double) numVertices;
-
- aggregator = getIterationRuntimeContext().getIterationAggregator(AGGREGATOR_NAME);
-
- if (currentIteration == 1) {
- danglingRankFactor = BETA * (double) numDanglingVertices / ((double) numVertices * (double) numVertices);
- } else {
- PageRankStats previousAggregate = getIterationRuntimeContext().getPreviousIterationAggregate(AGGREGATOR_NAME);
- danglingRankFactor = BETA * previousAggregate.danglingRank() / (double) numVertices;
- }
- }
-
- @Override
- public void coGroup(Iterator<Record> currentPageRankIterator, Iterator<Record> partialRanks,
- Collector<Record> collector)
- {
- if (!currentPageRankIterator.hasNext()) {
- long missingVertex = partialRanks.next().getField(0, LongValue.class).getValue();
- throw new IllegalStateException("No current page rank for vertex [" + missingVertex + "]!");
- }
-
- Record currentPageRank = currentPageRankIterator.next();
-
- long edges = 0;
- double summedRank = 0;
- while (partialRanks.hasNext()) {
- summedRank += partialRanks.next().getField(1, doubleInstance).getValue();
- edges++;
- }
-
- double rank = BETA * summedRank + dampingFactor + danglingRankFactor;
- double currentRank = currentPageRank.getField(1, doubleInstance).getValue();
- isDangling = currentPageRank.getField(2, isDangling);
-
- // maintain statistics to compensate for probability loss on dangling nodes
- double danglingRankToAggregate = isDangling.get() ? rank : 0;
- long danglingVerticesToAggregate = isDangling.get() ? 1 : 0;
- double diff = Math.abs(currentRank - rank);
- aggregator.aggregate(diff, rank, danglingRankToAggregate, danglingVerticesToAggregate, 1, edges);
-
- // return the new record
- newRank.setValue(rank);
- accumulator.setField(0, currentPageRank.getField(0, vertexID));
- accumulator.setField(1, newRank);
- accumulator.setField(2, isDangling);
- collector.collect(accumulator);
- }
-}
http://git-wip-us.apache.org/repos/asf/flink/blob/a7a57ebe/flink-tests/src/test/java/org/apache/flink/test/recordJobs/graph/pageRankUtil/DotProductMatch.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/recordJobs/graph/pageRankUtil/DotProductMatch.java b/flink-tests/src/test/java/org/apache/flink/test/recordJobs/graph/pageRankUtil/DotProductMatch.java
deleted file mode 100644
index 339cef5..0000000
--- a/flink-tests/src/test/java/org/apache/flink/test/recordJobs/graph/pageRankUtil/DotProductMatch.java
+++ /dev/null
@@ -1,63 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.test.recordJobs.graph.pageRankUtil;
-
-import java.io.Serializable;
-
-import org.apache.flink.api.java.record.functions.JoinFunction;
-import org.apache.flink.types.DoubleValue;
-import org.apache.flink.types.LongValue;
-import org.apache.flink.types.Record;
-import org.apache.flink.util.Collector;
-
-/**
- * In schema is_
- * INPUT = (pageId, rank, dangling), (pageId, neighbors-list).
- * OUTPUT = (targetPageId, partialRank)
- */
-@SuppressWarnings("deprecation")
-public class DotProductMatch extends JoinFunction implements Serializable {
- private static final long serialVersionUID = 1L;
-
- private Record record = new Record();
- private LongValue vertexID = new LongValue();
- private DoubleValue partialRank = new DoubleValue();
- private DoubleValue rank = new DoubleValue();
-
- private LongArrayView adjacentNeighbors = new LongArrayView();
-
- @Override
- public void join(Record pageWithRank, Record adjacencyList, Collector<Record> collector) throws Exception {
-
- rank = pageWithRank.getField(1, rank);
- adjacentNeighbors = adjacencyList.getField(1, adjacentNeighbors);
- int numNeighbors = adjacentNeighbors.size();
-
- double rankToDistribute = rank.getValue() / (double) numNeighbors;
-
- partialRank.setValue(rankToDistribute);
- record.setField(1, partialRank);
-
- for (int n = 0; n < numNeighbors; n++) {
- vertexID.setValue(adjacentNeighbors.getQuick(n));
- record.setField(0, vertexID);
- collector.collect(record);
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/flink/blob/a7a57ebe/flink-tests/src/test/java/org/apache/flink/test/recordJobs/graph/pageRankUtil/ImprovedAdjacencyListInputFormat.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/recordJobs/graph/pageRankUtil/ImprovedAdjacencyListInputFormat.java b/flink-tests/src/test/java/org/apache/flink/test/recordJobs/graph/pageRankUtil/ImprovedAdjacencyListInputFormat.java
deleted file mode 100644
index 6db4122..0000000
--- a/flink-tests/src/test/java/org/apache/flink/test/recordJobs/graph/pageRankUtil/ImprovedAdjacencyListInputFormat.java
+++ /dev/null
@@ -1,76 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-
-package org.apache.flink.test.recordJobs.graph.pageRankUtil;
-
-import org.apache.flink.api.java.record.io.TextInputFormat;
-import org.apache.flink.types.LongValue;
-import org.apache.flink.types.Record;
-
-public class ImprovedAdjacencyListInputFormat extends TextInputFormat {
- private static final long serialVersionUID = 1L;
-
- private final LongValue vertexID = new LongValue();
-
- private final AsciiLongArrayView arrayView = new AsciiLongArrayView();
-
- private final LongArrayView adjacentVertices = new LongArrayView();
-
- @Override
- public Record readRecord(Record target, byte[] bytes, int offset, int numBytes) {
-
- if (numBytes == 0) {
- return null;
- }
-
- arrayView.set(bytes, offset, numBytes);
-
- int numElements = arrayView.numElements();
- adjacentVertices.allocate(numElements - 1);
-
- try {
-
- int pos = 0;
- while (arrayView.next()) {
-
- if (pos == 0) {
- vertexID.setValue(arrayView.element());
- } else {
- adjacentVertices.setQuick(pos - 1, arrayView.element());
- }
-
- pos++;
- }
-
- // sanity check
- if (pos != numElements) {
- throw new IllegalStateException("Should have gotten " + numElements + " elements, but saw " + pos);
- }
-
- } catch (RuntimeException e) {
- throw new RuntimeException("Error parsing: " + arrayView.toString(), e);
- }
-
- target.clear();
- target.addField(vertexID);
- target.addField(adjacentVertices);
-
- return target;
- }
-}
http://git-wip-us.apache.org/repos/asf/flink/blob/a7a57ebe/flink-tests/src/test/java/org/apache/flink/test/recordJobs/graph/pageRankUtil/LongArrayView.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/recordJobs/graph/pageRankUtil/LongArrayView.java b/flink-tests/src/test/java/org/apache/flink/test/recordJobs/graph/pageRankUtil/LongArrayView.java
deleted file mode 100644
index 770274b..0000000
--- a/flink-tests/src/test/java/org/apache/flink/test/recordJobs/graph/pageRankUtil/LongArrayView.java
+++ /dev/null
@@ -1,89 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-
-package org.apache.flink.test.recordJobs.graph.pageRankUtil;
-
-import java.io.IOException;
-
-import org.apache.flink.core.memory.DataInputView;
-import org.apache.flink.core.memory.DataOutputView;
-import org.apache.flink.types.Value;
-
-public class LongArrayView implements Value {
- private static final long serialVersionUID = 1L;
-
- private long[] entries = new long[0];
-
- private int numEntries = 0;
-
- public LongArrayView() {
- }
-
- public long get(int index) {
- if (index >= numEntries) {
- throw new ArrayIndexOutOfBoundsException();
- }
- return getQuick(index);
- }
-
- public long getQuick(int index) {
- return entries[index];
- }
-
- public void allocate(int numEntries) {
- this.numEntries = numEntries;
- ensureCapacity();
- }
-
- public void set(int index, long value) {
- if (index >= numEntries) {
- throw new ArrayIndexOutOfBoundsException();
- }
- setQuick(index, value);
- }
-
- public void setQuick(int index, long value) {
- entries[index] = value;
- }
-
- public int size() {
- return numEntries;
- }
-
- private void ensureCapacity() {
- if (entries.length < numEntries) {
- entries = new long[numEntries];
- }
- }
-
- public void write(DataOutputView out) throws IOException {
- out.writeInt(numEntries);
- for (int n = 0; n < numEntries; n++) {
- out.writeLong(entries[n]);
- }
- }
-
- public void read(DataInputView in) throws IOException {
- numEntries = in.readInt();
- ensureCapacity();
- for (int n = 0; n < numEntries; n++) {
- entries[n] = in.readLong();
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/flink/blob/a7a57ebe/flink-tests/src/test/java/org/apache/flink/test/recordJobs/graph/pageRankUtil/PageRankStats.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/recordJobs/graph/pageRankUtil/PageRankStats.java b/flink-tests/src/test/java/org/apache/flink/test/recordJobs/graph/pageRankUtil/PageRankStats.java
deleted file mode 100644
index c6d06f5..0000000
--- a/flink-tests/src/test/java/org/apache/flink/test/recordJobs/graph/pageRankUtil/PageRankStats.java
+++ /dev/null
@@ -1,105 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-
-package org.apache.flink.test.recordJobs.graph.pageRankUtil;
-
-import java.io.IOException;
-
-import org.apache.flink.core.memory.DataInputView;
-import org.apache.flink.core.memory.DataOutputView;
-import org.apache.flink.types.Value;
-
-public class PageRankStats implements Value {
- private static final long serialVersionUID = 1L;
-
- private double diff;
-
- private double rank;
-
- private double danglingRank;
-
- private long numDanglingVertices;
-
- private long numVertices;
-
- private long edges;
-
- public PageRankStats() {
- }
-
- public PageRankStats(double diff, double rank, double danglingRank, long numDanglingVertices, long numVertices, long edges) {
- this.diff = diff;
- this.rank = rank;
- this.danglingRank = danglingRank;
- this.numDanglingVertices = numDanglingVertices;
- this.numVertices = numVertices;
- this.edges = edges;
- }
-
- public double diff() {
- return diff;
- }
-
- public double rank() {
- return rank;
- }
-
- public double danglingRank() {
- return danglingRank;
- }
-
- public long numDanglingVertices() {
- return numDanglingVertices;
- }
-
- public long numVertices() {
- return numVertices;
- }
-
- public long edges() {
- return edges;
- }
-
- @Override
- public void write(DataOutputView out) throws IOException {
- out.writeDouble(diff);
- out.writeDouble(rank);
- out.writeDouble(danglingRank);
- out.writeLong(numDanglingVertices);
- out.writeLong(numVertices);
- out.writeLong(edges);
- }
-
- @Override
- public void read(DataInputView in) throws IOException {
- diff = in.readDouble();
- rank = in.readDouble();
- danglingRank = in.readDouble();
- numDanglingVertices = in.readLong();
- numVertices = in.readLong();
- edges = in.readLong();
- }
-
- @Override
- public String toString() {
- return "PageRankStats: diff [" + diff + "], rank [" + rank + "], danglingRank [" + danglingRank +
- "], numDanglingVertices [" + numDanglingVertices + "], numVertices [" + numVertices + "], edges [" + edges +
- "]";
- }
-}
http://git-wip-us.apache.org/repos/asf/flink/blob/a7a57ebe/flink-tests/src/test/java/org/apache/flink/test/recordJobs/graph/pageRankUtil/PageRankStatsAggregator.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/recordJobs/graph/pageRankUtil/PageRankStatsAggregator.java b/flink-tests/src/test/java/org/apache/flink/test/recordJobs/graph/pageRankUtil/PageRankStatsAggregator.java
deleted file mode 100644
index 1457493..0000000
--- a/flink-tests/src/test/java/org/apache/flink/test/recordJobs/graph/pageRankUtil/PageRankStatsAggregator.java
+++ /dev/null
@@ -1,73 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-
-package org.apache.flink.test.recordJobs.graph.pageRankUtil;
-
-import org.apache.flink.api.common.aggregators.Aggregator;
-
-@SuppressWarnings("serial")
-public class PageRankStatsAggregator implements Aggregator<PageRankStats> {
-
- private double diff = 0;
-
- private double rank = 0;
-
- private double danglingRank = 0;
-
- private long numDanglingVertices = 0;
-
- private long numVertices = 0;
-
- private long edges = 0;
-
- @Override
- public PageRankStats getAggregate() {
- return new PageRankStats(diff, rank, danglingRank, numDanglingVertices, numVertices, edges);
- }
-
- public void aggregate(double diffDelta, double rankDelta, double danglingRankDelta, long danglingVerticesDelta,
- long verticesDelta, long edgesDelta) {
- diff += diffDelta;
- rank += rankDelta;
- danglingRank += danglingRankDelta;
- numDanglingVertices += danglingVerticesDelta;
- numVertices += verticesDelta;
- edges += edgesDelta;
- }
-
- @Override
- public void aggregate(PageRankStats pageRankStats) {
- diff += pageRankStats.diff();
- rank += pageRankStats.rank();
- danglingRank += pageRankStats.danglingRank();
- numDanglingVertices += pageRankStats.numDanglingVertices();
- numVertices += pageRankStats.numVertices();
- edges += pageRankStats.edges();
- }
-
- @Override
- public void reset() {
- diff = 0;
- rank = 0;
- danglingRank = 0;
- numDanglingVertices = 0;
- numVertices = 0;
- edges = 0;
- }
-}
http://git-wip-us.apache.org/repos/asf/flink/blob/a7a57ebe/flink-tests/src/test/java/org/apache/flink/test/recordJobs/graph/pageRankUtil/PageWithRankOutFormat.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/recordJobs/graph/pageRankUtil/PageWithRankOutFormat.java b/flink-tests/src/test/java/org/apache/flink/test/recordJobs/graph/pageRankUtil/PageWithRankOutFormat.java
deleted file mode 100644
index 6c6dc42..0000000
--- a/flink-tests/src/test/java/org/apache/flink/test/recordJobs/graph/pageRankUtil/PageWithRankOutFormat.java
+++ /dev/null
@@ -1,51 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-
-package org.apache.flink.test.recordJobs.graph.pageRankUtil;
-
-import org.apache.flink.api.java.record.io.DelimitedOutputFormat;
-import org.apache.flink.types.DoubleValue;
-import org.apache.flink.types.LongValue;
-import org.apache.flink.types.Record;
-
-public class PageWithRankOutFormat extends DelimitedOutputFormat {
- private static final long serialVersionUID = 1L;
-
- private final StringBuilder buffer = new StringBuilder();
-
- @Override
- public int serializeRecord(Record record, byte[] target) {
- StringBuilder buffer = this.buffer;
-
- buffer.setLength(0);
- buffer.append(record.getField(0, LongValue.class).toString());
- buffer.append('\t');
- buffer.append(record.getField(1, DoubleValue.class).toString());
- buffer.append('\n');
-
- if (target.length < buffer.length()) {
- return -buffer.length();
- }
-
- for (int i = 0; i < buffer.length(); i++) {
- target[i] = (byte) buffer.charAt(i);
- }
- return buffer.length();
- }
-}