You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by se...@apache.org on 2015/08/30 23:44:39 UTC
[4/5] flink git commit: [FLINK-1681] [tests] Remove outdated
'nephele' iteration tests.
http://git-wip-us.apache.org/repos/asf/flink/blob/a7a57ebe/flink-tests/src/test/java/org/apache/flink/test/iterative/nephele/customdanglingpagerank/CustomCompensatableDanglingPageRank.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/iterative/nephele/customdanglingpagerank/CustomCompensatableDanglingPageRank.java b/flink-tests/src/test/java/org/apache/flink/test/iterative/nephele/customdanglingpagerank/CustomCompensatableDanglingPageRank.java
deleted file mode 100644
index a6e6b6e..0000000
--- a/flink-tests/src/test/java/org/apache/flink/test/iterative/nephele/customdanglingpagerank/CustomCompensatableDanglingPageRank.java
+++ /dev/null
@@ -1,315 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.test.iterative.nephele.customdanglingpagerank;
-
-import org.apache.flink.api.common.operators.util.UserCodeClassWrapper;
-import org.apache.flink.api.common.typeutils.TypeComparatorFactory;
-import org.apache.flink.api.common.typeutils.TypePairComparatorFactory;
-import org.apache.flink.api.common.typeutils.TypeSerializerFactory;
-import org.apache.flink.api.java.record.io.FileOutputFormat;
-import org.apache.flink.configuration.Configuration;
-import org.apache.flink.runtime.iterative.task.IterationHeadPactTask;
-import org.apache.flink.runtime.iterative.task.IterationIntermediatePactTask;
-import org.apache.flink.runtime.iterative.task.IterationTailPactTask;
-import org.apache.flink.runtime.jobgraph.JobVertex;
-import org.apache.flink.runtime.jobgraph.DistributionPattern;
-import org.apache.flink.runtime.jobgraph.InputFormatVertex;
-import org.apache.flink.runtime.jobgraph.JobGraph;
-import org.apache.flink.runtime.jobgraph.OutputFormatVertex;
-import org.apache.flink.runtime.jobmanager.scheduler.SlotSharingGroup;
-import org.apache.flink.runtime.operators.BuildSecondCachedJoinDriver;
-import org.apache.flink.runtime.operators.CoGroupDriver;
-import org.apache.flink.runtime.operators.CollectorMapDriver;
-import org.apache.flink.runtime.operators.DriverStrategy;
-import org.apache.flink.runtime.operators.shipping.ShipStrategyType;
-import org.apache.flink.runtime.operators.util.LocalStrategy;
-import org.apache.flink.runtime.operators.util.TaskConfig;
-import org.apache.flink.test.iterative.nephele.JobGraphUtils;
-import org.apache.flink.test.iterative.nephele.customdanglingpagerank.types.VertexWithAdjacencyList;
-import org.apache.flink.test.iterative.nephele.customdanglingpagerank.types.VertexWithAdjacencyListComparatorFactory;
-import org.apache.flink.test.iterative.nephele.customdanglingpagerank.types.VertexWithAdjacencyListSerializerFactory;
-import org.apache.flink.test.iterative.nephele.customdanglingpagerank.types.VertexWithRank;
-import org.apache.flink.test.iterative.nephele.customdanglingpagerank.types.VertexWithRankAndDangling;
-import org.apache.flink.test.iterative.nephele.customdanglingpagerank.types.VertexWithRankAndDanglingComparatorFactory;
-import org.apache.flink.test.iterative.nephele.customdanglingpagerank.types.VertexWithRankAndDanglingSerializerFactory;
-import org.apache.flink.test.iterative.nephele.customdanglingpagerank.types.VertexWithRankComparatorFactory;
-import org.apache.flink.test.iterative.nephele.customdanglingpagerank.types.VertexWithRankDanglingToVertexWithAdjacencyListPairComparatorFactory;
-import org.apache.flink.test.iterative.nephele.customdanglingpagerank.types.VertexWithRankDanglingToVertexWithRankPairComparatorFactory;
-import org.apache.flink.test.iterative.nephele.customdanglingpagerank.types.VertexWithRankSerializerFactory;
-import org.apache.flink.test.iterative.nephele.danglingpagerank.DiffL1NormConvergenceCriterion;
-import org.apache.flink.test.iterative.nephele.danglingpagerank.PageRankStatsAggregator;
-import org.apache.flink.test.util.TestBaseUtils;
-
-public class CustomCompensatableDanglingPageRank {
-
- private static final int NUM_FILE_HANDLES_PER_SORT = 64;
-
- private static final float SORT_SPILL_THRESHOLD = 0.85f;
-
- private static final int ITERATION_ID = 1;
-
-
- private static TypeSerializerFactory<VertexWithRank> vertexWithRankSerializer = new VertexWithRankSerializerFactory();
-
- private static TypeSerializerFactory<VertexWithRankAndDangling> vertexWithRankAndDanglingSerializer = new VertexWithRankAndDanglingSerializerFactory();
-
- private static TypeSerializerFactory<VertexWithAdjacencyList> vertexWithAdjacencyListSerializer = new VertexWithAdjacencyListSerializerFactory();
-
- private static TypeComparatorFactory<VertexWithRank> vertexWithRankComparator = new VertexWithRankComparatorFactory();
-
- private static TypeComparatorFactory<VertexWithRankAndDangling> vertexWithRankAndDanglingComparator = new VertexWithRankAndDanglingComparatorFactory();
-
- private static TypeComparatorFactory<VertexWithAdjacencyList> vertexWithAdjacencyListComparator = new VertexWithAdjacencyListComparatorFactory();
-
- private static TypePairComparatorFactory<VertexWithRankAndDangling, VertexWithAdjacencyList> matchComparator =
- new VertexWithRankDanglingToVertexWithAdjacencyListPairComparatorFactory();
-
- private static TypePairComparatorFactory<VertexWithRankAndDangling, VertexWithRank> coGroupComparator =
- new VertexWithRankDanglingToVertexWithRankPairComparatorFactory();
-
-
-// public static void main(String[] args) throws Exception {
-// String confPath = args.length >= 6 ? confPath = args[5] : PlayConstants.PLAY_DIR + "local-conf";
-//
-// GlobalConfiguration.loadConfiguration(confPath);
-// Configuration conf = GlobalConfiguration.getConfiguration();
-//
-// JobGraph jobGraph = getJobGraph(args);
-// JobGraphUtils.submit(jobGraph, conf);
-// }
-
- public static JobGraph getJobGraph(String[] args) throws Exception {
-
- int parallelism = 2;
- String pageWithRankInputPath = ""; //"file://" + PlayConstants.PLAY_DIR + "test-inputs/danglingpagerank/pageWithRank";
- String adjacencyListInputPath = ""; //"file://" + PlayConstants.PLAY_DIR +
-// "test-inputs/danglingpagerank/adjacencylists";
- String outputPath = TestBaseUtils.constructTestURI(CustomCompensatableDanglingPageRank.class, "flink_iterations");
-// String confPath = PlayConstants.PLAY_DIR + "local-conf";
- int minorConsumer = 2;
- int matchMemory = 5;
- int coGroupSortMemory = 5;
- int numIterations = 25;
- long numVertices = 5;
- long numDanglingVertices = 1;
-
- String failingWorkers = "1";
- int failingIteration = 2;
- double messageLoss = 0.75;
-
- if (args.length >= 14) {
- parallelism = Integer.parseInt(args[0]);
- pageWithRankInputPath = args[1];
- adjacencyListInputPath = args[2];
- outputPath = args[3];
-// confPath = args[4];
- minorConsumer = Integer.parseInt(args[5]);
- matchMemory = Integer.parseInt(args[6]);
- coGroupSortMemory = Integer.parseInt(args[7]);
- numIterations = Integer.parseInt(args[8]);
- numVertices = Long.parseLong(args[9]);
- numDanglingVertices = Long.parseLong(args[10]);
- failingWorkers = args[11];
- failingIteration = Integer.parseInt(args[12]);
- messageLoss = Double.parseDouble(args[13]);
- }
-
- int totalMemoryConsumption = 3*minorConsumer + matchMemory + coGroupSortMemory;
-
- JobGraph jobGraph = new JobGraph("CompensatableDanglingPageRank");
-
- // --------------- the inputs ---------------------
-
- // page rank input
- InputFormatVertex pageWithRankInput = JobGraphUtils.createInput(new CustomImprovedDanglingPageRankInputFormat(),
- pageWithRankInputPath, "DanglingPageWithRankInput", jobGraph, parallelism);
- TaskConfig pageWithRankInputConfig = new TaskConfig(pageWithRankInput.getConfiguration());
- pageWithRankInputConfig.addOutputShipStrategy(ShipStrategyType.PARTITION_HASH);
- pageWithRankInputConfig.setOutputComparator(vertexWithRankAndDanglingComparator, 0);
- pageWithRankInputConfig.setOutputSerializer(vertexWithRankAndDanglingSerializer);
- pageWithRankInputConfig.setStubParameter("pageRank.numVertices", String.valueOf(numVertices));
-
- // edges as adjacency list
- InputFormatVertex adjacencyListInput = JobGraphUtils.createInput(new CustomImprovedAdjacencyListInputFormat(),
- adjacencyListInputPath, "AdjancencyListInput", jobGraph, parallelism);
- TaskConfig adjacencyListInputConfig = new TaskConfig(adjacencyListInput.getConfiguration());
- adjacencyListInputConfig.addOutputShipStrategy(ShipStrategyType.PARTITION_HASH);
- adjacencyListInputConfig.setOutputSerializer(vertexWithAdjacencyListSerializer);
- adjacencyListInputConfig.setOutputComparator(vertexWithAdjacencyListComparator, 0);
-
- // --------------- the head ---------------------
- JobVertex head = JobGraphUtils.createTask(IterationHeadPactTask.class, "IterationHead", jobGraph,
- parallelism);
- TaskConfig headConfig = new TaskConfig(head.getConfiguration());
- headConfig.setIterationId(ITERATION_ID);
-
- // initial input / partial solution
- headConfig.addInputToGroup(0);
- headConfig.setIterationHeadPartialSolutionOrWorksetInputIndex(0);
- headConfig.setInputSerializer(vertexWithRankAndDanglingSerializer, 0);
- headConfig.setInputComparator(vertexWithRankAndDanglingComparator, 0);
- headConfig.setInputLocalStrategy(0, LocalStrategy.SORT);
- headConfig.setRelativeMemoryInput(0, (double) minorConsumer / totalMemoryConsumption);
- headConfig.setFilehandlesInput(0, NUM_FILE_HANDLES_PER_SORT);
- headConfig.setSpillingThresholdInput(0, SORT_SPILL_THRESHOLD);
-
- // back channel / iterations
- headConfig.setRelativeBackChannelMemory((double) minorConsumer / totalMemoryConsumption);
-
- // output into iteration
- headConfig.setOutputSerializer(vertexWithRankAndDanglingSerializer);
- headConfig.addOutputShipStrategy(ShipStrategyType.FORWARD);
- headConfig.addOutputShipStrategy(ShipStrategyType.FORWARD);
-
- // final output
- TaskConfig headFinalOutConfig = new TaskConfig(new Configuration());
- headFinalOutConfig.setOutputSerializer(vertexWithRankAndDanglingSerializer);
- headFinalOutConfig.addOutputShipStrategy(ShipStrategyType.FORWARD);
- headConfig.setIterationHeadFinalOutputConfig(headFinalOutConfig);
-
- // the sync
- headConfig.setIterationHeadIndexOfSyncOutput(3);
- headConfig.setNumberOfIterations(numIterations);
-
- // the driver
- headConfig.setDriver(CollectorMapDriver.class);
- headConfig.setDriverStrategy(DriverStrategy.COLLECTOR_MAP);
- headConfig.setStubWrapper(new UserCodeClassWrapper<CustomCompensatingMap>(CustomCompensatingMap.class));
- headConfig.setStubParameter("pageRank.numVertices", String.valueOf(numVertices));
- headConfig.setStubParameter("compensation.failingWorker", failingWorkers);
- headConfig.setStubParameter("compensation.failingIteration", String.valueOf(failingIteration));
- headConfig.setStubParameter("compensation.messageLoss", String.valueOf(messageLoss));
- headConfig.addIterationAggregator(CustomCompensatableDotProductCoGroup.AGGREGATOR_NAME, new PageRankStatsAggregator());
-
- // --------------- the join ---------------------
-
- JobVertex intermediate = JobGraphUtils.createTask(IterationIntermediatePactTask.class,
- "IterationIntermediate", jobGraph, parallelism);
- TaskConfig intermediateConfig = new TaskConfig(intermediate.getConfiguration());
- intermediateConfig.setIterationId(ITERATION_ID);
-// intermediateConfig.setDriver(RepeatableHashjoinMatchDriverWithCachedBuildside.class);
- intermediateConfig.setDriver(BuildSecondCachedJoinDriver.class);
- intermediateConfig.setDriverStrategy(DriverStrategy.HYBRIDHASH_BUILD_SECOND);
- intermediateConfig.setRelativeMemoryDriver((double) matchMemory / totalMemoryConsumption);
- intermediateConfig.addInputToGroup(0);
- intermediateConfig.addInputToGroup(1);
- intermediateConfig.setInputSerializer(vertexWithRankAndDanglingSerializer, 0);
- intermediateConfig.setInputSerializer(vertexWithAdjacencyListSerializer, 1);
- intermediateConfig.setDriverComparator(vertexWithRankAndDanglingComparator, 0);
- intermediateConfig.setDriverComparator(vertexWithAdjacencyListComparator, 1);
- intermediateConfig.setDriverPairComparator(matchComparator);
-
- intermediateConfig.setOutputSerializer(vertexWithRankSerializer);
- intermediateConfig.addOutputShipStrategy(ShipStrategyType.PARTITION_HASH);
- intermediateConfig.setOutputComparator(vertexWithRankComparator, 0);
-
- intermediateConfig.setStubWrapper(new UserCodeClassWrapper<CustomCompensatableDotProductMatch>(CustomCompensatableDotProductMatch.class));
- intermediateConfig.setStubParameter("pageRank.numVertices", String.valueOf(numVertices));
- intermediateConfig.setStubParameter("compensation.failingWorker", failingWorkers);
- intermediateConfig.setStubParameter("compensation.failingIteration", String.valueOf(failingIteration));
- intermediateConfig.setStubParameter("compensation.messageLoss", String.valueOf(messageLoss));
-
- // ---------------- the tail (co group) --------------------
-
- JobVertex tail = JobGraphUtils.createTask(IterationTailPactTask.class, "IterationTail", jobGraph,
- parallelism);
- TaskConfig tailConfig = new TaskConfig(tail.getConfiguration());
- tailConfig.setIterationId(ITERATION_ID);
- tailConfig.setIsWorksetUpdate();
-
- // inputs and driver
- tailConfig.setDriver(CoGroupDriver.class);
- tailConfig.setDriverStrategy(DriverStrategy.CO_GROUP);
- tailConfig.addInputToGroup(0);
- tailConfig.addInputToGroup(1);
- tailConfig.setInputSerializer(vertexWithRankAndDanglingSerializer, 0);
- tailConfig.setInputSerializer(vertexWithRankSerializer, 1);
- tailConfig.setDriverComparator(vertexWithRankAndDanglingComparator, 0);
- tailConfig.setDriverComparator(vertexWithRankComparator, 1);
- tailConfig.setDriverPairComparator(coGroupComparator);
- tailConfig.setInputAsynchronouslyMaterialized(0, true);
- tailConfig.setRelativeInputMaterializationMemory(0, (double)minorConsumer/totalMemoryConsumption);
- tailConfig.setInputLocalStrategy(1, LocalStrategy.SORT);
- tailConfig.setInputComparator(vertexWithRankComparator, 1);
- tailConfig.setRelativeMemoryInput(1, (double) coGroupSortMemory / totalMemoryConsumption);
- tailConfig.setFilehandlesInput(1, NUM_FILE_HANDLES_PER_SORT);
- tailConfig.setSpillingThresholdInput(1, SORT_SPILL_THRESHOLD);
-
- // output
- tailConfig.setOutputSerializer(vertexWithRankAndDanglingSerializer);
-
- // the stub
- tailConfig.setStubWrapper(new UserCodeClassWrapper<CustomCompensatableDotProductCoGroup>(CustomCompensatableDotProductCoGroup.class));
- tailConfig.setStubParameter("pageRank.numVertices", String.valueOf(numVertices));
- tailConfig.setStubParameter("pageRank.numDanglingVertices", String.valueOf(numDanglingVertices));
- tailConfig.setStubParameter("compensation.failingWorker", failingWorkers);
- tailConfig.setStubParameter("compensation.failingIteration", String.valueOf(failingIteration));
- tailConfig.setStubParameter("compensation.messageLoss", String.valueOf(messageLoss));
-
- // --------------- the output ---------------------
-
- OutputFormatVertex output = JobGraphUtils.createFileOutput(jobGraph, "FinalOutput", parallelism);
- TaskConfig outputConfig = new TaskConfig(output.getConfiguration());
- outputConfig.addInputToGroup(0);
- outputConfig.setInputSerializer(vertexWithRankAndDanglingSerializer, 0);
- outputConfig.setStubWrapper(new UserCodeClassWrapper<CustomPageWithRankOutFormat>(CustomPageWithRankOutFormat.class));
- outputConfig.setStubParameter(FileOutputFormat.FILE_PARAMETER_KEY, outputPath);
-
- // --------------- the auxiliaries ---------------------
-
- JobVertex sync = JobGraphUtils.createSync(jobGraph, parallelism);
- TaskConfig syncConfig = new TaskConfig(sync.getConfiguration());
- syncConfig.setNumberOfIterations(numIterations);
- syncConfig.addIterationAggregator(CustomCompensatableDotProductCoGroup.AGGREGATOR_NAME, new PageRankStatsAggregator());
- syncConfig.setConvergenceCriterion(CustomCompensatableDotProductCoGroup.AGGREGATOR_NAME, new DiffL1NormConvergenceCriterion());
- syncConfig.setIterationId(ITERATION_ID);
-
- // --------------- the wiring ---------------------
-
- JobGraphUtils.connect(pageWithRankInput, head, DistributionPattern.ALL_TO_ALL);
-
- JobGraphUtils.connect(head, intermediate, DistributionPattern.POINTWISE);
- intermediateConfig.setGateIterativeWithNumberOfEventsUntilInterrupt(0, 1);
-
- JobGraphUtils.connect(adjacencyListInput, intermediate, DistributionPattern.ALL_TO_ALL);
-
- JobGraphUtils.connect(head, tail, DistributionPattern.POINTWISE);
- JobGraphUtils.connect(intermediate, tail, DistributionPattern.ALL_TO_ALL);
- tailConfig.setGateIterativeWithNumberOfEventsUntilInterrupt(0, 1);
- tailConfig.setGateIterativeWithNumberOfEventsUntilInterrupt(1, parallelism);
-
- JobGraphUtils.connect(head, output, DistributionPattern.POINTWISE);
-
- JobGraphUtils.connect(head, sync, DistributionPattern.POINTWISE);
-
- SlotSharingGroup sharingGroup = new SlotSharingGroup();
- pageWithRankInput.setSlotSharingGroup(sharingGroup);
- adjacencyListInput.setSlotSharingGroup(sharingGroup);
- head.setSlotSharingGroup(sharingGroup);
- intermediate.setSlotSharingGroup(sharingGroup);
- tail.setSlotSharingGroup(sharingGroup);
- output.setSlotSharingGroup(sharingGroup);
- sync.setSlotSharingGroup(sharingGroup);
-
- tail.setStrictlyCoLocatedWith(head);
- intermediate.setStrictlyCoLocatedWith(head);
-
- return jobGraph;
- }
-}
http://git-wip-us.apache.org/repos/asf/flink/blob/a7a57ebe/flink-tests/src/test/java/org/apache/flink/test/iterative/nephele/customdanglingpagerank/CustomCompensatableDanglingPageRankWithCombiner.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/iterative/nephele/customdanglingpagerank/CustomCompensatableDanglingPageRankWithCombiner.java b/flink-tests/src/test/java/org/apache/flink/test/iterative/nephele/customdanglingpagerank/CustomCompensatableDanglingPageRankWithCombiner.java
deleted file mode 100644
index 0bf258f..0000000
--- a/flink-tests/src/test/java/org/apache/flink/test/iterative/nephele/customdanglingpagerank/CustomCompensatableDanglingPageRankWithCombiner.java
+++ /dev/null
@@ -1,329 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.test.iterative.nephele.customdanglingpagerank;
-
-import org.apache.flink.api.common.operators.util.UserCodeClassWrapper;
-import org.apache.flink.api.common.typeutils.TypeComparatorFactory;
-import org.apache.flink.api.common.typeutils.TypePairComparatorFactory;
-import org.apache.flink.api.common.typeutils.TypeSerializerFactory;
-import org.apache.flink.api.java.record.io.FileOutputFormat;
-import org.apache.flink.configuration.Configuration;
-import org.apache.flink.runtime.iterative.task.IterationHeadPactTask;
-import org.apache.flink.runtime.iterative.task.IterationIntermediatePactTask;
-import org.apache.flink.runtime.iterative.task.IterationTailPactTask;
-import org.apache.flink.runtime.jobgraph.JobVertex;
-import org.apache.flink.runtime.jobgraph.DistributionPattern;
-import org.apache.flink.runtime.jobgraph.InputFormatVertex;
-import org.apache.flink.runtime.jobgraph.JobGraph;
-import org.apache.flink.runtime.jobgraph.OutputFormatVertex;
-import org.apache.flink.runtime.jobmanager.scheduler.SlotSharingGroup;
-import org.apache.flink.runtime.operators.BuildSecondCachedJoinDriver;
-import org.apache.flink.runtime.operators.CoGroupDriver;
-import org.apache.flink.runtime.operators.CollectorMapDriver;
-import org.apache.flink.runtime.operators.DriverStrategy;
-import org.apache.flink.runtime.operators.chaining.SynchronousChainedCombineDriver;
-import org.apache.flink.runtime.operators.shipping.ShipStrategyType;
-import org.apache.flink.runtime.operators.util.LocalStrategy;
-import org.apache.flink.runtime.operators.util.TaskConfig;
-import org.apache.flink.test.iterative.nephele.JobGraphUtils;
-import org.apache.flink.test.iterative.nephele.customdanglingpagerank.types.VertexWithAdjacencyList;
-import org.apache.flink.test.iterative.nephele.customdanglingpagerank.types.VertexWithAdjacencyListComparatorFactory;
-import org.apache.flink.test.iterative.nephele.customdanglingpagerank.types.VertexWithAdjacencyListSerializerFactory;
-import org.apache.flink.test.iterative.nephele.customdanglingpagerank.types.VertexWithRank;
-import org.apache.flink.test.iterative.nephele.customdanglingpagerank.types.VertexWithRankAndDangling;
-import org.apache.flink.test.iterative.nephele.customdanglingpagerank.types.VertexWithRankAndDanglingComparatorFactory;
-import org.apache.flink.test.iterative.nephele.customdanglingpagerank.types.VertexWithRankAndDanglingSerializerFactory;
-import org.apache.flink.test.iterative.nephele.customdanglingpagerank.types.VertexWithRankComparatorFactory;
-import org.apache.flink.test.iterative.nephele.customdanglingpagerank.types.VertexWithRankDanglingToVertexWithAdjacencyListPairComparatorFactory;
-import org.apache.flink.test.iterative.nephele.customdanglingpagerank.types.VertexWithRankDanglingToVertexWithRankPairComparatorFactory;
-import org.apache.flink.test.iterative.nephele.customdanglingpagerank.types.VertexWithRankSerializerFactory;
-import org.apache.flink.test.iterative.nephele.danglingpagerank.DiffL1NormConvergenceCriterion;
-import org.apache.flink.test.iterative.nephele.danglingpagerank.PageRankStatsAggregator;
-import org.apache.flink.test.util.TestBaseUtils;
-
-public class CustomCompensatableDanglingPageRankWithCombiner {
-
- private static final int NUM_FILE_HANDLES_PER_SORT = 64;
-
- private static final float SORT_SPILL_THRESHOLD = 0.85f;
-
- private static final int ITERATION_ID = 1;
-
-
- private static TypeSerializerFactory<VertexWithRank> vertexWithRankSerializer = new VertexWithRankSerializerFactory();
-
- private static TypeSerializerFactory<VertexWithRankAndDangling> vertexWithRankAndDanglingSerializer = new VertexWithRankAndDanglingSerializerFactory();
-
- private static TypeSerializerFactory<VertexWithAdjacencyList> vertexWithAdjacencyListSerializer = new VertexWithAdjacencyListSerializerFactory();
-
- private static TypeComparatorFactory<VertexWithRank> vertexWithRankComparator = new VertexWithRankComparatorFactory();
-
- private static TypeComparatorFactory<VertexWithRankAndDangling> vertexWithRankAndDanglingComparator = new VertexWithRankAndDanglingComparatorFactory();
-
- private static TypeComparatorFactory<VertexWithAdjacencyList> vertexWithAdjacencyListComparator = new VertexWithAdjacencyListComparatorFactory();
-
- private static TypePairComparatorFactory<VertexWithRankAndDangling, VertexWithAdjacencyList> matchComparator =
- new VertexWithRankDanglingToVertexWithAdjacencyListPairComparatorFactory();
-
- private static TypePairComparatorFactory<VertexWithRankAndDangling, VertexWithRank> coGroupComparator =
- new VertexWithRankDanglingToVertexWithRankPairComparatorFactory();
-
-
-// public static void main(String[] args) throws Exception {
-// String confPath = args.length >= 6 ? confPath = args[5] : PlayConstants.PLAY_DIR + "local-conf";
-//
-// GlobalConfiguration.loadConfiguration(confPath);
-// Configuration conf = GlobalConfiguration.getConfiguration();
-//
-// JobGraph jobGraph = getJobGraph(args);
-// JobGraphUtils.submit(jobGraph, conf);
-// }
-
- public static JobGraph getJobGraph(String[] args) throws Exception {
-
- int parallelism = 2;
- String pageWithRankInputPath = ""; //"file://" + PlayConstants.PLAY_DIR + "test-inputs/danglingpagerank/pageWithRank";
- String adjacencyListInputPath = ""; //"file://" + PlayConstants.PLAY_DIR +
-// "test-inputs/danglingpagerank/adjacencylists";
- String outputPath = TestBaseUtils.constructTestURI(CustomCompensatableDanglingPageRankWithCombiner.class, "flink_iterations");
- int minorConsumer = 2;
- int matchMemory = 5;
- int coGroupSortMemory = 5;
- int numIterations = 25;
- long numVertices = 5;
- long numDanglingVertices = 1;
-
- String failingWorkers = "1";
- int failingIteration = 2;
- double messageLoss = 0.75;
-
- if (args.length >= 14) {
- parallelism = Integer.parseInt(args[0]);
- pageWithRankInputPath = args[1];
- adjacencyListInputPath = args[2];
- outputPath = args[3];
- // [4] is config path
- minorConsumer = Integer.parseInt(args[5]);
- matchMemory = Integer.parseInt(args[6]);
- coGroupSortMemory = Integer.parseInt(args[7]);
- numIterations = Integer.parseInt(args[8]);
- numVertices = Long.parseLong(args[9]);
- numDanglingVertices = Long.parseLong(args[10]);
- failingWorkers = args[11];
- failingIteration = Integer.parseInt(args[12]);
- messageLoss = Double.parseDouble(args[13]);
- }
-
- int totalMemoryConsumption = 3*minorConsumer + 2*coGroupSortMemory + matchMemory;
-
- JobGraph jobGraph = new JobGraph("CompensatableDanglingPageRank");
-
- // --------------- the inputs ---------------------
-
- // page rank input
- InputFormatVertex pageWithRankInput = JobGraphUtils.createInput(new CustomImprovedDanglingPageRankInputFormat(),
- pageWithRankInputPath, "DanglingPageWithRankInput", jobGraph, parallelism);
- TaskConfig pageWithRankInputConfig = new TaskConfig(pageWithRankInput.getConfiguration());
- pageWithRankInputConfig.addOutputShipStrategy(ShipStrategyType.PARTITION_HASH);
- pageWithRankInputConfig.setOutputComparator(vertexWithRankAndDanglingComparator, 0);
- pageWithRankInputConfig.setOutputSerializer(vertexWithRankAndDanglingSerializer);
- pageWithRankInputConfig.setStubParameter("pageRank.numVertices", String.valueOf(numVertices));
-
- // edges as adjacency list
- InputFormatVertex adjacencyListInput = JobGraphUtils.createInput(new CustomImprovedAdjacencyListInputFormat(),
- adjacencyListInputPath, "AdjancencyListInput", jobGraph, parallelism);
- TaskConfig adjacencyListInputConfig = new TaskConfig(adjacencyListInput.getConfiguration());
- adjacencyListInputConfig.addOutputShipStrategy(ShipStrategyType.PARTITION_HASH);
- adjacencyListInputConfig.setOutputSerializer(vertexWithAdjacencyListSerializer);
- adjacencyListInputConfig.setOutputComparator(vertexWithAdjacencyListComparator, 0);
-
- // --------------- the head ---------------------
- JobVertex head = JobGraphUtils.createTask(IterationHeadPactTask.class, "IterationHead", jobGraph,
- parallelism);
- TaskConfig headConfig = new TaskConfig(head.getConfiguration());
- headConfig.setIterationId(ITERATION_ID);
-
- // initial input / partial solution
- headConfig.addInputToGroup(0);
- headConfig.setIterationHeadPartialSolutionOrWorksetInputIndex(0);
- headConfig.setInputSerializer(vertexWithRankAndDanglingSerializer, 0);
- headConfig.setInputComparator(vertexWithRankAndDanglingComparator, 0);
- headConfig.setInputLocalStrategy(0, LocalStrategy.SORT);
- headConfig.setRelativeMemoryInput(0, (double)minorConsumer/totalMemoryConsumption);
- headConfig.setFilehandlesInput(0, NUM_FILE_HANDLES_PER_SORT);
- headConfig.setSpillingThresholdInput(0, SORT_SPILL_THRESHOLD);
-
- // back channel / iterations
- headConfig.setRelativeBackChannelMemory((double)minorConsumer/totalMemoryConsumption);
-
- // output into iteration
- headConfig.setOutputSerializer(vertexWithRankAndDanglingSerializer);
- headConfig.addOutputShipStrategy(ShipStrategyType.FORWARD);
- headConfig.addOutputShipStrategy(ShipStrategyType.FORWARD);
-
- // final output
- TaskConfig headFinalOutConfig = new TaskConfig(new Configuration());
- headFinalOutConfig.setOutputSerializer(vertexWithRankAndDanglingSerializer);
- headFinalOutConfig.addOutputShipStrategy(ShipStrategyType.FORWARD);
- headConfig.setIterationHeadFinalOutputConfig(headFinalOutConfig);
-
- // the sync
- headConfig.setIterationHeadIndexOfSyncOutput(3);
- headConfig.setNumberOfIterations(numIterations);
-
- // the driver
- headConfig.setDriver(CollectorMapDriver.class);
- headConfig.setDriverStrategy(DriverStrategy.COLLECTOR_MAP);
- headConfig.setStubWrapper(new UserCodeClassWrapper<CustomCompensatingMap>(CustomCompensatingMap.class));
- headConfig.setStubParameter("pageRank.numVertices", String.valueOf(numVertices));
- headConfig.setStubParameter("compensation.failingWorker", failingWorkers);
- headConfig.setStubParameter("compensation.failingIteration", String.valueOf(failingIteration));
- headConfig.setStubParameter("compensation.messageLoss", String.valueOf(messageLoss));
- headConfig.addIterationAggregator(CustomCompensatableDotProductCoGroup.AGGREGATOR_NAME, new PageRankStatsAggregator());
-
- // --------------- the join ---------------------
-
- JobVertex intermediate = JobGraphUtils.createTask(IterationIntermediatePactTask.class,
- "IterationIntermediate", jobGraph, parallelism);
- TaskConfig intermediateConfig = new TaskConfig(intermediate.getConfiguration());
- intermediateConfig.setIterationId(ITERATION_ID);
-// intermediateConfig.setDriver(RepeatableHashjoinMatchDriverWithCachedBuildside.class);
- intermediateConfig.setDriver(BuildSecondCachedJoinDriver.class);
- intermediateConfig.setDriverStrategy(DriverStrategy.HYBRIDHASH_BUILD_SECOND);
- intermediateConfig.setRelativeMemoryDriver((double)matchMemory/totalMemoryConsumption);
- intermediateConfig.addInputToGroup(0);
- intermediateConfig.addInputToGroup(1);
- intermediateConfig.setInputSerializer(vertexWithRankAndDanglingSerializer, 0);
- intermediateConfig.setInputSerializer(vertexWithAdjacencyListSerializer, 1);
- intermediateConfig.setDriverComparator(vertexWithRankAndDanglingComparator, 0);
- intermediateConfig.setDriverComparator(vertexWithAdjacencyListComparator, 1);
- intermediateConfig.setDriverPairComparator(matchComparator);
-
- intermediateConfig.setOutputSerializer(vertexWithRankSerializer);
- intermediateConfig.addOutputShipStrategy(ShipStrategyType.FORWARD);
-
- intermediateConfig.setStubWrapper(new UserCodeClassWrapper<CustomCompensatableDotProductMatch>(CustomCompensatableDotProductMatch.class));
- intermediateConfig.setStubParameter("pageRank.numVertices", String.valueOf(numVertices));
- intermediateConfig.setStubParameter("compensation.failingWorker", failingWorkers);
- intermediateConfig.setStubParameter("compensation.failingIteration", String.valueOf(failingIteration));
- intermediateConfig.setStubParameter("compensation.messageLoss", String.valueOf(messageLoss));
-
- // the combiner and the output
- TaskConfig combinerConfig = new TaskConfig(new Configuration());
- combinerConfig.addInputToGroup(0);
- combinerConfig.setInputSerializer(vertexWithRankSerializer, 0);
- combinerConfig.setDriverStrategy(DriverStrategy.SORTED_GROUP_COMBINE);
- combinerConfig.setDriverComparator(vertexWithRankComparator, 0);
- combinerConfig.setDriverComparator(vertexWithRankComparator, 1);
- combinerConfig.setRelativeMemoryDriver((double)coGroupSortMemory/totalMemoryConsumption);
- combinerConfig.setOutputSerializer(vertexWithRankSerializer);
- combinerConfig.addOutputShipStrategy(ShipStrategyType.PARTITION_HASH);
- combinerConfig.setOutputComparator(vertexWithRankComparator, 0);
- combinerConfig.setStubWrapper(new UserCodeClassWrapper<CustomRankCombiner>(CustomRankCombiner.class));
- intermediateConfig.addChainedTask(SynchronousChainedCombineDriver.class, combinerConfig, "Combiner");
-
- // ---------------- the tail (co group) --------------------
-
- JobVertex tail = JobGraphUtils.createTask(IterationTailPactTask.class, "IterationTail", jobGraph,
- parallelism);
- TaskConfig tailConfig = new TaskConfig(tail.getConfiguration());
- tailConfig.setIterationId(ITERATION_ID);
- tailConfig.setIsWorksetUpdate();
-
- // inputs and driver
- tailConfig.setDriver(CoGroupDriver.class);
- tailConfig.setDriverStrategy(DriverStrategy.CO_GROUP);
- tailConfig.addInputToGroup(0);
- tailConfig.addInputToGroup(1);
- tailConfig.setInputSerializer(vertexWithRankAndDanglingSerializer, 0);
- tailConfig.setInputSerializer(vertexWithRankSerializer, 1);
- tailConfig.setDriverComparator(vertexWithRankAndDanglingComparator, 0);
- tailConfig.setDriverComparator(vertexWithRankComparator, 1);
- tailConfig.setDriverPairComparator(coGroupComparator);
- tailConfig.setInputAsynchronouslyMaterialized(0, true);
- tailConfig.setRelativeInputMaterializationMemory(0, (double)minorConsumer/totalMemoryConsumption);
- tailConfig.setInputLocalStrategy(1, LocalStrategy.SORT);
- tailConfig.setInputComparator(vertexWithRankComparator, 1);
- tailConfig.setRelativeMemoryInput(1, (double)coGroupSortMemory/totalMemoryConsumption);
- tailConfig.setFilehandlesInput(1, NUM_FILE_HANDLES_PER_SORT);
- tailConfig.setSpillingThresholdInput(1, SORT_SPILL_THRESHOLD);
- tailConfig.addIterationAggregator(CustomCompensatableDotProductCoGroup.AGGREGATOR_NAME, new PageRankStatsAggregator());
-
- // output
- tailConfig.setOutputSerializer(vertexWithRankAndDanglingSerializer);
-
- // the stub
- tailConfig.setStubWrapper(new UserCodeClassWrapper<CustomCompensatableDotProductCoGroup>(CustomCompensatableDotProductCoGroup.class));
- tailConfig.setStubParameter("pageRank.numVertices", String.valueOf(numVertices));
- tailConfig.setStubParameter("pageRank.numDanglingVertices", String.valueOf(numDanglingVertices));
- tailConfig.setStubParameter("compensation.failingWorker", failingWorkers);
- tailConfig.setStubParameter("compensation.failingIteration", String.valueOf(failingIteration));
- tailConfig.setStubParameter("compensation.messageLoss", String.valueOf(messageLoss));
-
- // --------------- the output ---------------------
-
- OutputFormatVertex output = JobGraphUtils.createFileOutput(jobGraph, "FinalOutput", parallelism);
- TaskConfig outputConfig = new TaskConfig(output.getConfiguration());
- outputConfig.addInputToGroup(0);
- outputConfig.setInputSerializer(vertexWithRankAndDanglingSerializer, 0);
- outputConfig.setStubWrapper(new UserCodeClassWrapper<CustomPageWithRankOutFormat>(CustomPageWithRankOutFormat.class));
- outputConfig.setStubParameter(FileOutputFormat.FILE_PARAMETER_KEY, outputPath);
-
- // --------------- the auxiliaries ---------------------
-
- JobVertex sync = JobGraphUtils.createSync(jobGraph, parallelism);
- TaskConfig syncConfig = new TaskConfig(sync.getConfiguration());
- syncConfig.setNumberOfIterations(numIterations);
- syncConfig.addIterationAggregator(CustomCompensatableDotProductCoGroup.AGGREGATOR_NAME, new PageRankStatsAggregator());
- syncConfig.setConvergenceCriterion(CustomCompensatableDotProductCoGroup.AGGREGATOR_NAME, new DiffL1NormConvergenceCriterion());
- syncConfig.setIterationId(ITERATION_ID);
-
- // --------------- the wiring ---------------------
-
- JobGraphUtils.connect(pageWithRankInput, head, DistributionPattern.ALL_TO_ALL);
-
- JobGraphUtils.connect(head, intermediate, DistributionPattern.POINTWISE);
- intermediateConfig.setGateIterativeWithNumberOfEventsUntilInterrupt(0, 1);
-
- JobGraphUtils.connect(adjacencyListInput, intermediate, DistributionPattern.ALL_TO_ALL);
-
- JobGraphUtils.connect(head, tail, DistributionPattern.POINTWISE);
- JobGraphUtils.connect(intermediate, tail, DistributionPattern.ALL_TO_ALL);
- tailConfig.setGateIterativeWithNumberOfEventsUntilInterrupt(0, 1);
- tailConfig.setGateIterativeWithNumberOfEventsUntilInterrupt(1, parallelism);
-
- JobGraphUtils.connect(head, output, DistributionPattern.POINTWISE);
-
- JobGraphUtils.connect(head, sync, DistributionPattern.POINTWISE);
-
- SlotSharingGroup sharingGroup = new SlotSharingGroup();
- pageWithRankInput.setSlotSharingGroup(sharingGroup);
- adjacencyListInput.setSlotSharingGroup(sharingGroup);
- head.setSlotSharingGroup(sharingGroup);
- intermediate.setSlotSharingGroup(sharingGroup);
- tail.setSlotSharingGroup(sharingGroup);
- output.setSlotSharingGroup(sharingGroup);
- sync.setSlotSharingGroup(sharingGroup);
-
- tail.setStrictlyCoLocatedWith(head);
- intermediate.setStrictlyCoLocatedWith(head);
-
- return jobGraph;
- }
-}
http://git-wip-us.apache.org/repos/asf/flink/blob/a7a57ebe/flink-tests/src/test/java/org/apache/flink/test/iterative/nephele/customdanglingpagerank/CustomCompensatableDotProductCoGroup.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/iterative/nephele/customdanglingpagerank/CustomCompensatableDotProductCoGroup.java b/flink-tests/src/test/java/org/apache/flink/test/iterative/nephele/customdanglingpagerank/CustomCompensatableDotProductCoGroup.java
deleted file mode 100644
index faf777e..0000000
--- a/flink-tests/src/test/java/org/apache/flink/test/iterative/nephele/customdanglingpagerank/CustomCompensatableDotProductCoGroup.java
+++ /dev/null
@@ -1,130 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.test.iterative.nephele.customdanglingpagerank;
-
-import java.util.Iterator;
-import java.util.Set;
-
-import org.apache.flink.api.common.functions.AbstractRichFunction;
-import org.apache.flink.api.common.functions.CoGroupFunction;
-import org.apache.flink.configuration.Configuration;
-import org.apache.flink.test.iterative.nephele.ConfigUtils;
-import org.apache.flink.test.iterative.nephele.customdanglingpagerank.types.VertexWithRank;
-import org.apache.flink.test.iterative.nephele.customdanglingpagerank.types.VertexWithRankAndDangling;
-import org.apache.flink.test.iterative.nephele.danglingpagerank.PageRankStats;
-import org.apache.flink.test.iterative.nephele.danglingpagerank.PageRankStatsAggregator;
-import org.apache.flink.util.Collector;
-
-public class CustomCompensatableDotProductCoGroup extends AbstractRichFunction implements CoGroupFunction<VertexWithRankAndDangling, VertexWithRank, VertexWithRankAndDangling> {
-
- private static final long serialVersionUID = 1L;
-
- public static final String AGGREGATOR_NAME = "pagerank.aggregator";
-
- private VertexWithRankAndDangling accumulator = new VertexWithRankAndDangling();
-
- private PageRankStatsAggregator aggregator;
-
- private long numVertices;
-
- private long numDanglingVertices;
-
- private double dampingFactor;
-
- private double danglingRankFactor;
-
- private static final double BETA = 0.85;
-
- private int workerIndex;
-
- private int currentIteration;
-
- private int failingIteration;
-
- private Set<Integer> failingWorkers;
-
- @Override
- public void open(Configuration parameters) throws Exception {
- workerIndex = getRuntimeContext().getIndexOfThisSubtask();
- currentIteration = getIterationRuntimeContext().getSuperstepNumber();
-
- failingIteration = ConfigUtils.asInteger("compensation.failingIteration", parameters);
- failingWorkers = ConfigUtils.asIntSet("compensation.failingWorker", parameters);
- numVertices = ConfigUtils.asLong("pageRank.numVertices", parameters);
- numDanglingVertices = ConfigUtils.asLong("pageRank.numDanglingVertices", parameters);
-
- dampingFactor = (1d - BETA) / (double) numVertices;
-
- aggregator = getIterationRuntimeContext().getIterationAggregator(AGGREGATOR_NAME);
-
- if (currentIteration == 1) {
- danglingRankFactor = BETA * (double) numDanglingVertices / ((double) numVertices * (double) numVertices);
- } else {
- PageRankStats previousAggregate = getIterationRuntimeContext().getPreviousIterationAggregate(AGGREGATOR_NAME);
- danglingRankFactor = BETA * previousAggregate.danglingRank() / (double) numVertices;
- }
- }
-
- @Override
- public void coGroup(Iterable<VertexWithRankAndDangling> currentPageRankIterable, Iterable<VertexWithRank> partialRanks,
- Collector<VertexWithRankAndDangling> collector)
- {
- final Iterator<VertexWithRankAndDangling> currentPageRankIterator = currentPageRankIterable.iterator();
-
- if (!currentPageRankIterator.hasNext()) {
- long missingVertex = partialRanks.iterator().next().getVertexID();
- throw new IllegalStateException("No current page rank for vertex [" + missingVertex + "]!");
- }
-
- VertexWithRankAndDangling currentPageRank = currentPageRankIterator.next();
-
- long edges = 0;
- double summedRank = 0;
- for (VertexWithRank pr :partialRanks) {
- summedRank += pr.getRank();
- edges++;
- }
-
- double rank = BETA * summedRank + dampingFactor + danglingRankFactor;
-
- double currentRank = currentPageRank.getRank();
- boolean isDangling = currentPageRank.isDangling();
-
- double danglingRankToAggregate = isDangling ? rank : 0;
- long danglingVerticesToAggregate = isDangling ? 1 : 0;
-
- double diff = Math.abs(currentRank - rank);
-
- aggregator.aggregate(diff, rank, danglingRankToAggregate, danglingVerticesToAggregate, 1, edges, summedRank, 0);
-
- accumulator.setVertexID(currentPageRank.getVertexID());
- accumulator.setRank(rank);
- accumulator.setDangling(isDangling);
-
- collector.collect(accumulator);
- }
-
- @Override
- public void close() throws Exception {
- if (currentIteration == failingIteration && failingWorkers.contains(workerIndex)) {
- aggregator.reset();
- }
- }
-
-}
http://git-wip-us.apache.org/repos/asf/flink/blob/a7a57ebe/flink-tests/src/test/java/org/apache/flink/test/iterative/nephele/customdanglingpagerank/CustomCompensatableDotProductMatch.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/iterative/nephele/customdanglingpagerank/CustomCompensatableDotProductMatch.java b/flink-tests/src/test/java/org/apache/flink/test/iterative/nephele/customdanglingpagerank/CustomCompensatableDotProductMatch.java
deleted file mode 100644
index cc20aba..0000000
--- a/flink-tests/src/test/java/org/apache/flink/test/iterative/nephele/customdanglingpagerank/CustomCompensatableDotProductMatch.java
+++ /dev/null
@@ -1,80 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-
-package org.apache.flink.test.iterative.nephele.customdanglingpagerank;
-
-import java.util.Random;
-import java.util.Set;
-
-import org.apache.flink.api.common.functions.AbstractRichFunction;
-import org.apache.flink.api.common.functions.FlatJoinFunction;
-import org.apache.flink.configuration.Configuration;
-import org.apache.flink.test.iterative.nephele.ConfigUtils;
-import org.apache.flink.test.iterative.nephele.customdanglingpagerank.types.VertexWithAdjacencyList;
-import org.apache.flink.test.iterative.nephele.customdanglingpagerank.types.VertexWithRank;
-import org.apache.flink.test.iterative.nephele.customdanglingpagerank.types.VertexWithRankAndDangling;
-import org.apache.flink.util.Collector;
-
-public class CustomCompensatableDotProductMatch extends AbstractRichFunction implements
- FlatJoinFunction<VertexWithRankAndDangling, VertexWithAdjacencyList, VertexWithRank>
-{
- private static final long serialVersionUID = 1L;
-
-
- private VertexWithRank record = new VertexWithRank();
-
- private Random random = new Random();
-
- private double messageLoss;
-
- private boolean isFailure;
-
- @Override
- public void open(Configuration parameters) throws Exception {
- int workerIndex = getRuntimeContext().getIndexOfThisSubtask();
- int currentIteration = getIterationRuntimeContext().getSuperstepNumber();
- int failingIteration = ConfigUtils.asInteger("compensation.failingIteration", parameters);
- Set<Integer> failingWorkers = ConfigUtils.asIntSet("compensation.failingWorker", parameters);
- isFailure = currentIteration == failingIteration && failingWorkers.contains(workerIndex);
- messageLoss = ConfigUtils.asDouble("compensation.messageLoss", parameters);
- }
-
- @Override
- public void join(VertexWithRankAndDangling pageWithRank, VertexWithAdjacencyList adjacencyList, Collector<VertexWithRank> collector)
- throws Exception
- {
- double rank = pageWithRank.getRank();
- long[] adjacentNeighbors = adjacencyList.getTargets();
- int numNeighbors = adjacencyList.getNumTargets();
-
- double rankToDistribute = rank / (double) numNeighbors;
- record.setRank(rankToDistribute);
-
- for (int n = 0; n < numNeighbors; n++) {
- record.setVertexID(adjacentNeighbors[n]);
- if (isFailure) {
- if (random.nextDouble() >= messageLoss) {
- collector.collect(record);
- }
- } else {
- collector.collect(record);
- }
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/flink/blob/a7a57ebe/flink-tests/src/test/java/org/apache/flink/test/iterative/nephele/customdanglingpagerank/CustomCompensatingMap.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/iterative/nephele/customdanglingpagerank/CustomCompensatingMap.java b/flink-tests/src/test/java/org/apache/flink/test/iterative/nephele/customdanglingpagerank/CustomCompensatingMap.java
deleted file mode 100644
index d1bf03c..0000000
--- a/flink-tests/src/test/java/org/apache/flink/test/iterative/nephele/customdanglingpagerank/CustomCompensatingMap.java
+++ /dev/null
@@ -1,82 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-
-package org.apache.flink.test.iterative.nephele.customdanglingpagerank;
-
-import java.util.Set;
-
-import org.apache.flink.api.common.functions.AbstractRichFunction;
-import org.apache.flink.api.common.functions.GenericCollectorMap;
-import org.apache.flink.configuration.Configuration;
-import org.apache.flink.test.iterative.nephele.ConfigUtils;
-import org.apache.flink.test.iterative.nephele.customdanglingpagerank.types.VertexWithRankAndDangling;
-import org.apache.flink.test.iterative.nephele.danglingpagerank.PageRankStats;
-import org.apache.flink.util.Collector;
-
-@SuppressWarnings("deprecation")
-public class CustomCompensatingMap extends AbstractRichFunction implements GenericCollectorMap<VertexWithRankAndDangling, VertexWithRankAndDangling> {
-
- private static final long serialVersionUID = 1L;
-
-
- private boolean isFailureIteration;
-
- private boolean isFailingWorker;
-
- private double uniformRank;
-
- private double rescaleFactor;
-
- @Override
- public void open(Configuration parameters) throws Exception {
- int currentIteration = getIterationRuntimeContext().getSuperstepNumber();
- int failingIteration = ConfigUtils.asInteger("compensation.failingIteration", parameters);
- isFailureIteration = currentIteration == failingIteration + 1;
-
- int workerIndex = getRuntimeContext().getIndexOfThisSubtask();
- Set<Integer> failingWorkers = ConfigUtils.asIntSet("compensation.failingWorker", parameters);
- isFailingWorker = failingWorkers.contains(workerIndex);
-
- long numVertices = ConfigUtils.asLong("pageRank.numVertices", parameters);
-
- if (currentIteration > 1) {
-
- PageRankStats stats = (PageRankStats) getIterationRuntimeContext().getPreviousIterationAggregate(CustomCompensatableDotProductCoGroup.AGGREGATOR_NAME);
-
- uniformRank = 1d / (double) numVertices;
- double lostMassFactor = (numVertices - stats.numVertices()) / (double) numVertices;
- rescaleFactor = (1 - lostMassFactor) / stats.rank();
- }
- }
-
- @Override
- public void map(VertexWithRankAndDangling pageWithRank, Collector<VertexWithRankAndDangling> out) throws Exception {
-
- if (isFailureIteration) {
- double rank = pageWithRank.getRank();
-
- if (isFailingWorker) {
- pageWithRank.setRank(uniformRank);
- } else {
- pageWithRank.setRank(rank * rescaleFactor);
- }
- }
- out.collect(pageWithRank);
- }
-}
http://git-wip-us.apache.org/repos/asf/flink/blob/a7a57ebe/flink-tests/src/test/java/org/apache/flink/test/iterative/nephele/customdanglingpagerank/CustomImprovedAdjacencyListInputFormat.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/iterative/nephele/customdanglingpagerank/CustomImprovedAdjacencyListInputFormat.java b/flink-tests/src/test/java/org/apache/flink/test/iterative/nephele/customdanglingpagerank/CustomImprovedAdjacencyListInputFormat.java
deleted file mode 100644
index 863c081..0000000
--- a/flink-tests/src/test/java/org/apache/flink/test/iterative/nephele/customdanglingpagerank/CustomImprovedAdjacencyListInputFormat.java
+++ /dev/null
@@ -1,66 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-
-package org.apache.flink.test.iterative.nephele.customdanglingpagerank;
-
-import org.apache.flink.api.common.io.DelimitedInputFormat;
-import org.apache.flink.test.iterative.nephele.customdanglingpagerank.types.VertexWithAdjacencyList;
-import org.apache.flink.test.iterative.nephele.danglingpagerank.AsciiLongArrayView;
-
-public class CustomImprovedAdjacencyListInputFormat extends DelimitedInputFormat<VertexWithAdjacencyList> {
- private static final long serialVersionUID = 1L;
-
- private final AsciiLongArrayView arrayView = new AsciiLongArrayView();
-
- @Override
- public VertexWithAdjacencyList readRecord(VertexWithAdjacencyList target, byte[] bytes, int offset, int numBytes) {
-
- if (numBytes == 0) {
- return null;
- }
-
- arrayView.set(bytes, offset, numBytes);
-
- long[] list = target.getTargets();
-
- try {
-
- int pos = 0;
- while (arrayView.next()) {
-
- if (pos == 0) {
- target.setVertexID(arrayView.element());
- } else {
- if (list.length <= pos - 1) {
- list = new long[list.length < 16 ? 16 : list.length * 2];
- target.setTargets(list);
- }
- list[pos - 1] = arrayView.element();
- }
- pos++;
- }
-
- target.setNumTargets(pos - 1);
- } catch (RuntimeException e) {
- throw new RuntimeException("Error parsing: " + arrayView.toString(), e);
- }
-
- return target;
- }
-}
http://git-wip-us.apache.org/repos/asf/flink/blob/a7a57ebe/flink-tests/src/test/java/org/apache/flink/test/iterative/nephele/customdanglingpagerank/CustomImprovedDanglingPageRankInputFormat.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/iterative/nephele/customdanglingpagerank/CustomImprovedDanglingPageRankInputFormat.java b/flink-tests/src/test/java/org/apache/flink/test/iterative/nephele/customdanglingpagerank/CustomImprovedDanglingPageRankInputFormat.java
deleted file mode 100644
index ec5b6d1..0000000
--- a/flink-tests/src/test/java/org/apache/flink/test/iterative/nephele/customdanglingpagerank/CustomImprovedDanglingPageRankInputFormat.java
+++ /dev/null
@@ -1,66 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-
-package org.apache.flink.test.iterative.nephele.customdanglingpagerank;
-
-import org.apache.flink.api.common.io.DelimitedInputFormat;
-import org.apache.flink.configuration.Configuration;
-import org.apache.flink.test.iterative.nephele.ConfigUtils;
-import org.apache.flink.test.iterative.nephele.customdanglingpagerank.types.VertexWithRankAndDangling;
-import org.apache.flink.test.iterative.nephele.danglingpagerank.AsciiLongArrayView;
-
-public class CustomImprovedDanglingPageRankInputFormat extends DelimitedInputFormat<VertexWithRankAndDangling> {
- private static final long serialVersionUID = 1L;
-
- private AsciiLongArrayView arrayView = new AsciiLongArrayView();
-
- private static final long DANGLING_MARKER = 1l;
-
- private double initialRank;
-
- @Override
- public void configure(Configuration parameters) {
- long numVertices = ConfigUtils.asLong("pageRank.numVertices", parameters);
- initialRank = 1.0 / numVertices;
- super.configure(parameters);
- }
-
- @Override
- public VertexWithRankAndDangling readRecord(VertexWithRankAndDangling target, byte[] bytes, int offset, int numBytes) {
-
- arrayView.set(bytes, offset, numBytes);
-
- try {
- arrayView.next();
- target.setVertexID(arrayView.element());
-
- if (arrayView.next()) {
- target.setDangling(arrayView.element() == DANGLING_MARKER);
- } else {
- target.setDangling(false);
- }
-
- } catch (NumberFormatException e) {
- throw new RuntimeException("Error parsing " + arrayView.toString(), e);
- }
-
- target.setRank(initialRank);
- return target;
- }
-}
http://git-wip-us.apache.org/repos/asf/flink/blob/a7a57ebe/flink-tests/src/test/java/org/apache/flink/test/iterative/nephele/customdanglingpagerank/CustomPageWithRankOutFormat.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/iterative/nephele/customdanglingpagerank/CustomPageWithRankOutFormat.java b/flink-tests/src/test/java/org/apache/flink/test/iterative/nephele/customdanglingpagerank/CustomPageWithRankOutFormat.java
deleted file mode 100644
index b4ffb25..0000000
--- a/flink-tests/src/test/java/org/apache/flink/test/iterative/nephele/customdanglingpagerank/CustomPageWithRankOutFormat.java
+++ /dev/null
@@ -1,45 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-
-package org.apache.flink.test.iterative.nephele.customdanglingpagerank;
-
-import com.google.common.base.Charsets;
-
-import java.io.IOException;
-
-import org.apache.flink.api.common.io.FileOutputFormat;
-import org.apache.flink.test.iterative.nephele.customdanglingpagerank.types.VertexWithRankAndDangling;
-
-public class CustomPageWithRankOutFormat extends FileOutputFormat<VertexWithRankAndDangling> {
- private static final long serialVersionUID = 1L;
-
- private final StringBuilder buffer = new StringBuilder();
-
- @Override
- public void writeRecord(VertexWithRankAndDangling record) throws IOException {
- buffer.setLength(0);
- buffer.append(record.getVertexID());
- buffer.append('\t');
- buffer.append(record.getRank());
- buffer.append('\n');
-
- byte[] bytes = buffer.toString().getBytes(Charsets.UTF_8);
- stream.write(bytes);
- }
-}
http://git-wip-us.apache.org/repos/asf/flink/blob/a7a57ebe/flink-tests/src/test/java/org/apache/flink/test/iterative/nephele/customdanglingpagerank/CustomRankCombiner.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/iterative/nephele/customdanglingpagerank/CustomRankCombiner.java b/flink-tests/src/test/java/org/apache/flink/test/iterative/nephele/customdanglingpagerank/CustomRankCombiner.java
deleted file mode 100644
index e2a160d..0000000
--- a/flink-tests/src/test/java/org/apache/flink/test/iterative/nephele/customdanglingpagerank/CustomRankCombiner.java
+++ /dev/null
@@ -1,57 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.test.iterative.nephele.customdanglingpagerank;
-
-import java.util.Iterator;
-
-import org.apache.flink.api.common.functions.AbstractRichFunction;
-import org.apache.flink.api.common.functions.GroupCombineFunction;
-import org.apache.flink.api.common.functions.GroupReduceFunction;
-import org.apache.flink.test.iterative.nephele.customdanglingpagerank.types.VertexWithRank;
-import org.apache.flink.util.Collector;
-
-
-public class CustomRankCombiner extends AbstractRichFunction implements GroupReduceFunction<VertexWithRank, VertexWithRank>,
- GroupCombineFunction<VertexWithRank, VertexWithRank>
-{
- private static final long serialVersionUID = 1L;
-
- private final VertexWithRank accumulator = new VertexWithRank();
-
- @Override
- public void reduce(Iterable<VertexWithRank> records, Collector<VertexWithRank> out) throws Exception {
- throw new UnsupportedOperationException();
- }
-
- @Override
- public void combine(Iterable<VertexWithRank> recordsIterable, Collector<VertexWithRank> out) throws Exception {
- final Iterator<VertexWithRank> records = recordsIterable.iterator();
-
- VertexWithRank next = records.next();
- this.accumulator.setVertexID(next.getVertexID());
- double rank = next.getRank();
-
- while (records.hasNext()) {
- rank += records.next().getRank();
- }
-
- this.accumulator.setRank(rank);
- out.collect(this.accumulator);
- }
-}
http://git-wip-us.apache.org/repos/asf/flink/blob/a7a57ebe/flink-tests/src/test/java/org/apache/flink/test/iterative/nephele/customdanglingpagerank/types/VertexWithAdjacencyList.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/iterative/nephele/customdanglingpagerank/types/VertexWithAdjacencyList.java b/flink-tests/src/test/java/org/apache/flink/test/iterative/nephele/customdanglingpagerank/types/VertexWithAdjacencyList.java
deleted file mode 100644
index d19dd59..0000000
--- a/flink-tests/src/test/java/org/apache/flink/test/iterative/nephele/customdanglingpagerank/types/VertexWithAdjacencyList.java
+++ /dev/null
@@ -1,83 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.test.iterative.nephele.customdanglingpagerank.types;
-
-
-/**
- *
- */
-public final class VertexWithAdjacencyList {
-
- private static final long[] EMPTY = new long[0];
-
- private long vertexID;
-
- private long[] targets;
-
- private int numTargets;
-
- public VertexWithAdjacencyList() {
- this.targets = EMPTY;
- }
-
- public VertexWithAdjacencyList(long vertexID, long[] targets) {
- this.vertexID = vertexID;
- this.targets = targets;
- }
-
-
- public long getVertexID() {
- return vertexID;
- }
-
- public void setVertexID(long vertexID) {
- this.vertexID = vertexID;
- }
-
- public long[] getTargets() {
- return targets;
- }
-
- public void setTargets(long[] targets) {
- this.targets = targets;
- }
-
- public int getNumTargets() {
- return numTargets;
- }
-
- public void setNumTargets(int numTargets) {
- this.numTargets = numTargets;
- }
-
-
- @Override
- public String toString() {
- StringBuilder bld = new StringBuilder(32);
- bld.append(this.vertexID);
- bld.append(" : ");
- for (int i = 0; i < this.numTargets; i++) {
- if (i != 0) {
- bld.append(',');
- }
- bld.append(this.targets[i]);
- }
- return bld.toString();
- }
-}
http://git-wip-us.apache.org/repos/asf/flink/blob/a7a57ebe/flink-tests/src/test/java/org/apache/flink/test/iterative/nephele/customdanglingpagerank/types/VertexWithAdjacencyListComparator.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/iterative/nephele/customdanglingpagerank/types/VertexWithAdjacencyListComparator.java b/flink-tests/src/test/java/org/apache/flink/test/iterative/nephele/customdanglingpagerank/types/VertexWithAdjacencyListComparator.java
deleted file mode 100644
index 7d11530..0000000
--- a/flink-tests/src/test/java/org/apache/flink/test/iterative/nephele/customdanglingpagerank/types/VertexWithAdjacencyListComparator.java
+++ /dev/null
@@ -1,148 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.test.iterative.nephele.customdanglingpagerank.types;
-
-import java.io.IOException;
-
-import org.apache.flink.api.common.typeutils.TypeComparator;
-import org.apache.flink.api.common.typeutils.base.LongComparator;
-import org.apache.flink.core.memory.DataInputView;
-import org.apache.flink.core.memory.DataOutputView;
-import org.apache.flink.core.memory.MemorySegment;
-
-public final class VertexWithAdjacencyListComparator extends TypeComparator<VertexWithAdjacencyList> {
-
- private static final long serialVersionUID = 1L;
-
- private long reference;
-
- @SuppressWarnings("rawtypes")
- private TypeComparator[] comparators = new TypeComparator[]{new LongComparator(true)};
-
- @Override
- public int hash(VertexWithAdjacencyList record) {
- final long value = record.getVertexID();
- return 43 + (int) (value ^ value >>> 32);
- }
-
- @Override
- public void setReference(VertexWithAdjacencyList toCompare) {
- this.reference = toCompare.getVertexID();
- }
-
- @Override
- public boolean equalToReference(VertexWithAdjacencyList candidate) {
- return candidate.getVertexID() == this.reference;
- }
-
- @Override
- public int compareToReference(TypeComparator<VertexWithAdjacencyList> referencedComparator) {
- VertexWithAdjacencyListComparator comp = (VertexWithAdjacencyListComparator) referencedComparator;
- final long diff = comp.reference - this.reference;
- return diff < 0 ? -1 : diff > 0 ? 1 : 0;
- }
-
- @Override
- public int compare(VertexWithAdjacencyList first, VertexWithAdjacencyList second) {
- final long diff = first.getVertexID() - second.getVertexID();
- return diff < 0 ? -1 : diff > 0 ? 1 : 0;
- }
-
- @Override
- public int compareSerialized(DataInputView source1, DataInputView source2) throws IOException {
- final long diff = source1.readLong() - source2.readLong();
- return diff < 0 ? -1 : diff > 0 ? 1 : 0;
- }
-
- @Override
- public boolean supportsNormalizedKey() {
- return true;
- }
-
- @Override
- public int getNormalizeKeyLen() {
- return 8;
- }
-
- @Override
- public boolean isNormalizedKeyPrefixOnly(int keyBytes) {
- return keyBytes < 8;
- }
-
- @Override
- public void putNormalizedKey(VertexWithAdjacencyList record, MemorySegment target, int offset, int len) {
- final long value = record.getVertexID() - Long.MIN_VALUE;
-
- // see IntValue for an explanation of the logic
- if (len == 8) {
- // default case, full normalized key
- target.putLongBigEndian(offset, value);
- }
- else if (len <= 0) {
- }
- else if (len < 8) {
- for (int i = 0; len > 0; len--, i++) {
- target.put(offset + i, (byte) ((value >>> ((3-i)<<3)) & 0xff));
- }
- }
- else {
- target.putLongBigEndian(offset, value);
- for (int i = 8; i < len; i++) {
- target.put(offset + i, (byte) 0);
- }
- }
- }
-
- @Override
- public boolean invertNormalizedKey() {
- return false;
- }
-
- @Override
- public boolean supportsSerializationWithKeyNormalization() {
- return false;
- }
-
- @Override
- public void writeWithKeyNormalization(VertexWithAdjacencyList record, DataOutputView target) {
- throw new UnsupportedOperationException();
- }
-
- @Override
- public VertexWithAdjacencyList readWithKeyDenormalization(VertexWithAdjacencyList reuse, DataInputView source) {
- throw new UnsupportedOperationException();
- }
-
- @Override
- public VertexWithAdjacencyListComparator duplicate() {
- return new VertexWithAdjacencyListComparator();
- }
-
- @Override
- public int extractKeys(Object record, Object[] target, int index) {
- target[index] = ((VertexWithAdjacencyList) record).getVertexID();
- return 1;
- }
-
- @Override
- @SuppressWarnings("rawtypes")
- public TypeComparator[] getFlatComparators() {
- return comparators;
- }
-}
http://git-wip-us.apache.org/repos/asf/flink/blob/a7a57ebe/flink-tests/src/test/java/org/apache/flink/test/iterative/nephele/customdanglingpagerank/types/VertexWithAdjacencyListComparatorFactory.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/iterative/nephele/customdanglingpagerank/types/VertexWithAdjacencyListComparatorFactory.java b/flink-tests/src/test/java/org/apache/flink/test/iterative/nephele/customdanglingpagerank/types/VertexWithAdjacencyListComparatorFactory.java
deleted file mode 100644
index 0bc3263..0000000
--- a/flink-tests/src/test/java/org/apache/flink/test/iterative/nephele/customdanglingpagerank/types/VertexWithAdjacencyListComparatorFactory.java
+++ /dev/null
@@ -1,39 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.test.iterative.nephele.customdanglingpagerank.types;
-
-import org.apache.flink.api.common.typeutils.TypeComparatorFactory;
-import org.apache.flink.configuration.Configuration;
-
-/**
- *
- */
-public final class VertexWithAdjacencyListComparatorFactory implements TypeComparatorFactory<VertexWithAdjacencyList> {
-
- @Override
- public void writeParametersToConfig(Configuration config) {}
-
- @Override
- public void readParametersFromConfig(Configuration config, ClassLoader cl) throws ClassNotFoundException {}
-
- @Override
- public VertexWithAdjacencyListComparator createComparator() {
- return new VertexWithAdjacencyListComparator();
- }
-}
http://git-wip-us.apache.org/repos/asf/flink/blob/a7a57ebe/flink-tests/src/test/java/org/apache/flink/test/iterative/nephele/customdanglingpagerank/types/VertexWithAdjacencyListSerializer.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/iterative/nephele/customdanglingpagerank/types/VertexWithAdjacencyListSerializer.java b/flink-tests/src/test/java/org/apache/flink/test/iterative/nephele/customdanglingpagerank/types/VertexWithAdjacencyListSerializer.java
deleted file mode 100644
index 751ced3..0000000
--- a/flink-tests/src/test/java/org/apache/flink/test/iterative/nephele/customdanglingpagerank/types/VertexWithAdjacencyListSerializer.java
+++ /dev/null
@@ -1,112 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.test.iterative.nephele.customdanglingpagerank.types;
-
-import java.io.IOException;
-
-import org.apache.flink.api.common.typeutils.base.TypeSerializerSingleton;
-import org.apache.flink.core.memory.DataInputView;
-import org.apache.flink.core.memory.DataOutputView;
-
-public final class VertexWithAdjacencyListSerializer extends TypeSerializerSingleton<VertexWithAdjacencyList> {
-
- private static final long serialVersionUID = 1L;
-
-
- @Override
- public boolean isImmutableType() {
- return false;
- }
-
- @Override
- public VertexWithAdjacencyList createInstance() {
- return new VertexWithAdjacencyList();
- }
-
- @Override
- public VertexWithAdjacencyList copy(VertexWithAdjacencyList from) {
- VertexWithAdjacencyList copy = new VertexWithAdjacencyList(from.getVertexID(), new long[from.getNumTargets()]);
- copy.setNumTargets(from.getNumTargets());
- System.arraycopy(from.getTargets(), 0, copy.getTargets(), 0, from.getNumTargets());
- return copy;
- }
-
- @Override
- public VertexWithAdjacencyList copy(VertexWithAdjacencyList from, VertexWithAdjacencyList reuse) {
- if (reuse.getTargets().length < from.getTargets().length) {
- reuse.setTargets(new long[from.getTargets().length]);
- }
-
- reuse.setVertexID(from.getVertexID());
- reuse.setNumTargets(from.getNumTargets());
- System.arraycopy(from.getTargets(), 0, reuse.getTargets(), 0, from.getNumTargets());
- return reuse;
- }
-
- @Override
- public int getLength() {
- return -1;
- }
-
- @Override
- public void serialize(VertexWithAdjacencyList record, DataOutputView target) throws IOException {
- target.writeLong(record.getVertexID());
-
- final long[] targets = record.getTargets();
- final int numTargets = record.getNumTargets();
- target.writeInt(numTargets);
-
- for (int i = 0; i < numTargets; i++) {
- target.writeLong(targets[i]);
- }
- }
-
- @Override
- public VertexWithAdjacencyList deserialize(DataInputView source) throws IOException {
- return deserialize(new VertexWithAdjacencyList(), source);
- }
-
- @Override
- public VertexWithAdjacencyList deserialize(VertexWithAdjacencyList target, DataInputView source) throws IOException {
- target.setVertexID(source.readLong());
-
- final int numTargets = source.readInt();
- long[] targets = target.getTargets();
- if (targets.length < numTargets) {
- targets = new long[numTargets];
- target.setTargets(targets);
- }
-
- target.setNumTargets(numTargets);
-
- for (int i = 0; i < numTargets; i++) {
- targets[i] = source.readLong();
- }
- return target;
- }
-
- @Override
- public void copy(DataInputView source, DataOutputView target) throws IOException {
- target.write(source, 8);
-
- final int numTargets = source.readInt();
- target.writeInt(numTargets);
- target.write(source, numTargets * 8);
- }
-}
http://git-wip-us.apache.org/repos/asf/flink/blob/a7a57ebe/flink-tests/src/test/java/org/apache/flink/test/iterative/nephele/customdanglingpagerank/types/VertexWithAdjacencyListSerializerFactory.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/iterative/nephele/customdanglingpagerank/types/VertexWithAdjacencyListSerializerFactory.java b/flink-tests/src/test/java/org/apache/flink/test/iterative/nephele/customdanglingpagerank/types/VertexWithAdjacencyListSerializerFactory.java
deleted file mode 100644
index cdd50a6..0000000
--- a/flink-tests/src/test/java/org/apache/flink/test/iterative/nephele/customdanglingpagerank/types/VertexWithAdjacencyListSerializerFactory.java
+++ /dev/null
@@ -1,56 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.test.iterative.nephele.customdanglingpagerank.types;
-
-import org.apache.flink.api.common.typeutils.TypeSerializerFactory;
-import org.apache.flink.configuration.Configuration;
-
-
-public final class VertexWithAdjacencyListSerializerFactory implements TypeSerializerFactory<VertexWithAdjacencyList> {
-
- private static final VertexWithAdjacencyListSerializer INSTANCE = new VertexWithAdjacencyListSerializer();
-
- @Override
- public void writeParametersToConfig(Configuration config) {}
-
- @Override
- public void readParametersFromConfig(Configuration config, ClassLoader cl) throws ClassNotFoundException {}
-
- @Override
- public VertexWithAdjacencyListSerializer getSerializer() {
- return INSTANCE;
- }
-
- @Override
- public Class<VertexWithAdjacencyList> getDataType() {
- return VertexWithAdjacencyList.class;
- }
-
- // --------------------------------------------------------------------------------------------
-
- @Override
- public int hashCode() {
- return 1;
- }
-
- @Override
- public boolean equals(Object obj) {
- return obj != null && obj.getClass() == VertexWithAdjacencyListSerializerFactory.class;
- }
-}
http://git-wip-us.apache.org/repos/asf/flink/blob/a7a57ebe/flink-tests/src/test/java/org/apache/flink/test/iterative/nephele/customdanglingpagerank/types/VertexWithRank.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/iterative/nephele/customdanglingpagerank/types/VertexWithRank.java b/flink-tests/src/test/java/org/apache/flink/test/iterative/nephele/customdanglingpagerank/types/VertexWithRank.java
deleted file mode 100644
index e62185e..0000000
--- a/flink-tests/src/test/java/org/apache/flink/test/iterative/nephele/customdanglingpagerank/types/VertexWithRank.java
+++ /dev/null
@@ -1,65 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.test.iterative.nephele.customdanglingpagerank.types;
-
-import java.io.Serializable;
-
-
-/**
- *
- */
-public final class VertexWithRank implements Serializable {
- private static final long serialVersionUID = 1L;
-
- private long vertexID;
-
- private double rank;
-
-
- public VertexWithRank() {
- }
-
- public VertexWithRank(long vertexID, double rank) {
- this.vertexID = vertexID;
- this.rank = rank;
- }
-
-
- public long getVertexID() {
- return vertexID;
- }
-
- public void setVertexID(long vertexID) {
- this.vertexID = vertexID;
- }
-
- public double getRank() {
- return rank;
- }
-
- public void setRank(double rank) {
- this.rank = rank;
- }
-
-
- @Override
- public String toString() {
- return this.vertexID + " - " + this.rank;
- }
-}
http://git-wip-us.apache.org/repos/asf/flink/blob/a7a57ebe/flink-tests/src/test/java/org/apache/flink/test/iterative/nephele/customdanglingpagerank/types/VertexWithRankAndDangling.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/iterative/nephele/customdanglingpagerank/types/VertexWithRankAndDangling.java b/flink-tests/src/test/java/org/apache/flink/test/iterative/nephele/customdanglingpagerank/types/VertexWithRankAndDangling.java
deleted file mode 100644
index 4cdb594..0000000
--- a/flink-tests/src/test/java/org/apache/flink/test/iterative/nephele/customdanglingpagerank/types/VertexWithRankAndDangling.java
+++ /dev/null
@@ -1,76 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.test.iterative.nephele.customdanglingpagerank.types;
-
-import java.io.Serializable;
-
-
-/**
- *
- */
-public final class VertexWithRankAndDangling implements Serializable {
- private static final long serialVersionUID = 1L;
-
- private long vertexID;
-
- private double rank;
-
- private boolean dangling;
-
-
- public VertexWithRankAndDangling() {
- }
-
- public VertexWithRankAndDangling(long vertexID, double rank, boolean dangling) {
- this.vertexID = vertexID;
- this.rank = rank;
- this.dangling = dangling;
- }
-
-
- public long getVertexID() {
- return vertexID;
- }
-
- public void setVertexID(long vertexID) {
- this.vertexID = vertexID;
- }
-
- public double getRank() {
- return rank;
- }
-
- public void setRank(double rank) {
- this.rank = rank;
- }
-
- public boolean isDangling() {
- return dangling;
- }
-
- public void setDangling(boolean dangling) {
- this.dangling = dangling;
- }
-
-
- @Override
- public String toString() {
- return this.vertexID + " - " + this.rank + (this.isDangling() ? " (dangling)" : "");
- }
-}