You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by se...@apache.org on 2015/08/30 23:44:38 UTC
[3/5] flink git commit: [FLINK-1681] [tests] Remove outdated
'nephele' iteration tests.
http://git-wip-us.apache.org/repos/asf/flink/blob/a7a57ebe/flink-tests/src/test/java/org/apache/flink/test/iterative/nephele/customdanglingpagerank/types/VertexWithRankAndDanglingComparator.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/iterative/nephele/customdanglingpagerank/types/VertexWithRankAndDanglingComparator.java b/flink-tests/src/test/java/org/apache/flink/test/iterative/nephele/customdanglingpagerank/types/VertexWithRankAndDanglingComparator.java
deleted file mode 100644
index d83c3fb..0000000
--- a/flink-tests/src/test/java/org/apache/flink/test/iterative/nephele/customdanglingpagerank/types/VertexWithRankAndDanglingComparator.java
+++ /dev/null
@@ -1,153 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.test.iterative.nephele.customdanglingpagerank.types;
-
-import java.io.IOException;
-
-import org.apache.flink.api.common.typeutils.TypeComparator;
-import org.apache.flink.api.common.typeutils.base.LongComparator;
-import org.apache.flink.core.memory.DataInputView;
-import org.apache.flink.core.memory.DataOutputView;
-import org.apache.flink.core.memory.MemorySegment;
-
-public final class VertexWithRankAndDanglingComparator extends TypeComparator<VertexWithRankAndDangling> {
-
- private static final long serialVersionUID = 1L;
-
- private long reference;
-
- @SuppressWarnings("rawtypes")
- private TypeComparator[] comparators = new TypeComparator[]{new LongComparator(true)};
-
- @Override
- public int hash(VertexWithRankAndDangling record) {
- final long value = record.getVertexID();
- return 43 + (int) (value ^ value >>> 32);
- }
-
- @Override
- public void setReference(VertexWithRankAndDangling toCompare) {
- this.reference = toCompare.getVertexID();
- }
-
- @Override
- public boolean equalToReference(VertexWithRankAndDangling candidate) {
- return candidate.getVertexID() == this.reference;
- }
-
- @Override
- public int compareToReference(TypeComparator<VertexWithRankAndDangling> referencedComparator) {
- VertexWithRankAndDanglingComparator comp = (VertexWithRankAndDanglingComparator) referencedComparator;
- final long diff = comp.reference - this.reference;
- return diff < 0 ? -1 : diff > 0 ? 1 : 0;
- }
-
- @Override
- public int compare(VertexWithRankAndDangling first, VertexWithRankAndDangling second) {
- final long diff = first.getVertexID() - second.getVertexID();
- return diff < 0 ? -1 : diff > 0 ? 1 : 0;
- }
-
- @Override
- public int compareSerialized(DataInputView source1, DataInputView source2) throws IOException {
- final long diff = source1.readLong() - source2.readLong();
- return diff < 0 ? -1 : diff > 0 ? 1 : 0;
- }
-
- @Override
- public boolean supportsNormalizedKey() {
- return true;
- }
-
- @Override
- public int getNormalizeKeyLen() {
- return 8;
- }
-
- @Override
- public boolean isNormalizedKeyPrefixOnly(int keyBytes) {
- return keyBytes < 8;
- }
-
- @Override
- public void putNormalizedKey(VertexWithRankAndDangling record, MemorySegment target, int offset, int len) {
- final long value = record.getVertexID() - Long.MIN_VALUE;
-
- // see IntValue for an explanation of the logic
- if (len == 8) {
- // default case, full normalized key
- target.putLongBigEndian(offset, value);
- }
- else if (len <= 0) {
- }
- else if (len < 8) {
- for (int i = 0; len > 0; len--, i++) {
- target.put(offset + i, (byte) ((value >>> ((3-i)<<3)) & 0xff));
- }
- }
- else {
- target.putLongBigEndian(offset, value);
- for (int i = 8; i < len; i++) {
- target.put(offset + i, (byte) 0);
- }
- }
- }
-
- @Override
- public boolean invertNormalizedKey() {
- return false;
- }
-
- @Override
- public boolean supportsSerializationWithKeyNormalization() {
- return true;
- }
-
- @Override
- public void writeWithKeyNormalization(VertexWithRankAndDangling record, DataOutputView target) throws IOException {
- target.writeLong(record.getVertexID() - Long.MIN_VALUE);
- target.writeDouble(record.getRank());
- target.writeBoolean(record.isDangling());
- }
-
- @Override
- public VertexWithRankAndDangling readWithKeyDenormalization(VertexWithRankAndDangling reuse, DataInputView source) throws IOException {
- reuse.setVertexID(source.readLong() + Long.MIN_VALUE);
- reuse.setRank(source.readDouble());
- reuse.setDangling(source.readBoolean());
- return reuse;
- }
-
- @Override
- public VertexWithRankAndDanglingComparator duplicate() {
- return new VertexWithRankAndDanglingComparator();
- }
-
- @Override
- public int extractKeys(Object record, Object[] target, int index) {
- target[index] = ((VertexWithRankAndDangling) record).getVertexID();
- return 1;
- }
-
- @Override
- @SuppressWarnings("rawtypes")
- public TypeComparator[] getFlatComparators() {
- return comparators;
- }
-}
http://git-wip-us.apache.org/repos/asf/flink/blob/a7a57ebe/flink-tests/src/test/java/org/apache/flink/test/iterative/nephele/customdanglingpagerank/types/VertexWithRankAndDanglingComparatorFactory.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/iterative/nephele/customdanglingpagerank/types/VertexWithRankAndDanglingComparatorFactory.java b/flink-tests/src/test/java/org/apache/flink/test/iterative/nephele/customdanglingpagerank/types/VertexWithRankAndDanglingComparatorFactory.java
deleted file mode 100644
index 532ca3e..0000000
--- a/flink-tests/src/test/java/org/apache/flink/test/iterative/nephele/customdanglingpagerank/types/VertexWithRankAndDanglingComparatorFactory.java
+++ /dev/null
@@ -1,39 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.test.iterative.nephele.customdanglingpagerank.types;
-
-import org.apache.flink.api.common.typeutils.TypeComparatorFactory;
-import org.apache.flink.configuration.Configuration;
-
-/**
- *
- */
-public final class VertexWithRankAndDanglingComparatorFactory implements TypeComparatorFactory<VertexWithRankAndDangling> {
-
- @Override
- public void writeParametersToConfig(Configuration config) {}
-
- @Override
- public void readParametersFromConfig(Configuration config, ClassLoader cl) throws ClassNotFoundException {}
-
- @Override
- public VertexWithRankAndDanglingComparator createComparator() {
- return new VertexWithRankAndDanglingComparator();
- }
-}
http://git-wip-us.apache.org/repos/asf/flink/blob/a7a57ebe/flink-tests/src/test/java/org/apache/flink/test/iterative/nephele/customdanglingpagerank/types/VertexWithRankAndDanglingSerializer.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/iterative/nephele/customdanglingpagerank/types/VertexWithRankAndDanglingSerializer.java b/flink-tests/src/test/java/org/apache/flink/test/iterative/nephele/customdanglingpagerank/types/VertexWithRankAndDanglingSerializer.java
deleted file mode 100644
index 8ff0233..0000000
--- a/flink-tests/src/test/java/org/apache/flink/test/iterative/nephele/customdanglingpagerank/types/VertexWithRankAndDanglingSerializer.java
+++ /dev/null
@@ -1,84 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.test.iterative.nephele.customdanglingpagerank.types;
-
-import java.io.IOException;
-
-import org.apache.flink.api.common.typeutils.base.TypeSerializerSingleton;
-import org.apache.flink.core.memory.DataInputView;
-import org.apache.flink.core.memory.DataOutputView;
-
-public final class VertexWithRankAndDanglingSerializer extends TypeSerializerSingleton<VertexWithRankAndDangling> {
-
- private static final long serialVersionUID = 1L;
-
-
- @Override
- public boolean isImmutableType() {
- return false;
- }
-
- @Override
- public VertexWithRankAndDangling createInstance() {
- return new VertexWithRankAndDangling();
- }
-
- @Override
- public VertexWithRankAndDangling copy(VertexWithRankAndDangling from) {
- return new VertexWithRankAndDangling(from.getVertexID(), from.getRank(), from.isDangling());
- }
-
- @Override
- public VertexWithRankAndDangling copy(VertexWithRankAndDangling from, VertexWithRankAndDangling reuse) {
- reuse.setVertexID(from.getVertexID());
- reuse.setRank(from.getRank());
- reuse.setDangling(from.isDangling());
- return reuse;
- }
-
- @Override
- public int getLength() {
- return 17;
- }
-
- @Override
- public void serialize(VertexWithRankAndDangling record, DataOutputView target) throws IOException {
- target.writeLong(record.getVertexID());
- target.writeDouble(record.getRank());
- target.writeBoolean(record.isDangling());
- }
-
- @Override
- public VertexWithRankAndDangling deserialize(DataInputView source) throws IOException {
- return new VertexWithRankAndDangling(source.readLong(), source.readDouble(), source.readBoolean());
- }
-
- @Override
- public VertexWithRankAndDangling deserialize(VertexWithRankAndDangling target, DataInputView source) throws IOException {
- target.setVertexID(source.readLong());
- target.setRank(source.readDouble());
- target.setDangling(source.readBoolean());
- return target;
- }
-
- @Override
- public void copy(DataInputView source, DataOutputView target) throws IOException {
- target.write(source, 17);
- }
-}
http://git-wip-us.apache.org/repos/asf/flink/blob/a7a57ebe/flink-tests/src/test/java/org/apache/flink/test/iterative/nephele/customdanglingpagerank/types/VertexWithRankAndDanglingSerializerFactory.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/iterative/nephele/customdanglingpagerank/types/VertexWithRankAndDanglingSerializerFactory.java b/flink-tests/src/test/java/org/apache/flink/test/iterative/nephele/customdanglingpagerank/types/VertexWithRankAndDanglingSerializerFactory.java
deleted file mode 100644
index 7f40d72..0000000
--- a/flink-tests/src/test/java/org/apache/flink/test/iterative/nephele/customdanglingpagerank/types/VertexWithRankAndDanglingSerializerFactory.java
+++ /dev/null
@@ -1,56 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.test.iterative.nephele.customdanglingpagerank.types;
-
-import org.apache.flink.api.common.typeutils.TypeSerializerFactory;
-import org.apache.flink.configuration.Configuration;
-
-
-public final class VertexWithRankAndDanglingSerializerFactory implements TypeSerializerFactory<VertexWithRankAndDangling> {
-
- private static final VertexWithRankAndDanglingSerializer INSTANCE = new VertexWithRankAndDanglingSerializer();
-
- @Override
- public void writeParametersToConfig(Configuration config) {}
-
- @Override
- public void readParametersFromConfig(Configuration config, ClassLoader cl) throws ClassNotFoundException {}
-
- @Override
- public VertexWithRankAndDanglingSerializer getSerializer() {
- return INSTANCE;
- }
-
- @Override
- public Class<VertexWithRankAndDangling> getDataType() {
- return VertexWithRankAndDangling.class;
- }
-
- // --------------------------------------------------------------------------------------------
-
- @Override
- public int hashCode() {
- return 1;
- }
-
- @Override
- public boolean equals(Object obj) {
- return obj != null && obj.getClass() == VertexWithRankAndDanglingSerializerFactory.class;
- }
-}
http://git-wip-us.apache.org/repos/asf/flink/blob/a7a57ebe/flink-tests/src/test/java/org/apache/flink/test/iterative/nephele/customdanglingpagerank/types/VertexWithRankComparator.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/iterative/nephele/customdanglingpagerank/types/VertexWithRankComparator.java b/flink-tests/src/test/java/org/apache/flink/test/iterative/nephele/customdanglingpagerank/types/VertexWithRankComparator.java
deleted file mode 100644
index 9107f4b..0000000
--- a/flink-tests/src/test/java/org/apache/flink/test/iterative/nephele/customdanglingpagerank/types/VertexWithRankComparator.java
+++ /dev/null
@@ -1,151 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.test.iterative.nephele.customdanglingpagerank.types;
-
-import java.io.IOException;
-
-import org.apache.flink.api.common.typeutils.TypeComparator;
-import org.apache.flink.api.common.typeutils.base.LongComparator;
-import org.apache.flink.core.memory.DataInputView;
-import org.apache.flink.core.memory.DataOutputView;
-import org.apache.flink.core.memory.MemorySegment;
-
-public final class VertexWithRankComparator extends TypeComparator<VertexWithRank> {
-
- private static final long serialVersionUID = 1L;
-
- private long reference;
-
- @SuppressWarnings("rawtypes")
- private TypeComparator[] comparators = new TypeComparator[]{new LongComparator(true)};
-
- @Override
- public int hash(VertexWithRank record) {
- final long value = record.getVertexID();
- return 43 + (int) (value ^ value >>> 32);
- }
-
- @Override
- public void setReference(VertexWithRank toCompare) {
- this.reference = toCompare.getVertexID();
- }
-
- @Override
- public boolean equalToReference(VertexWithRank candidate) {
- return candidate.getVertexID() == this.reference;
- }
-
- @Override
- public int compareToReference(TypeComparator<VertexWithRank> referencedComparator) {
- VertexWithRankComparator comp = (VertexWithRankComparator) referencedComparator;
- final long diff = comp.reference - this.reference;
- return diff < 0 ? -1 : diff > 0 ? 1 : 0;
- }
-
- @Override
- public int compare(VertexWithRank first, VertexWithRank second) {
- final long diff = first.getVertexID() - second.getVertexID();
- return diff < 0 ? -1 : diff > 0 ? 1 : 0;
- }
-
- @Override
- public int compareSerialized(DataInputView source1, DataInputView source2) throws IOException {
- final long diff = source1.readLong() - source2.readLong();
- return diff < 0 ? -1 : diff > 0 ? 1 : 0;
- }
-
- @Override
- public boolean supportsNormalizedKey() {
- return true;
- }
-
- @Override
- public int getNormalizeKeyLen() {
- return 8;
- }
-
- @Override
- public boolean isNormalizedKeyPrefixOnly(int keyBytes) {
- return keyBytes < 8;
- }
-
- @Override
- public void putNormalizedKey(VertexWithRank record, MemorySegment target, int offset, int len) {
- final long value = record.getVertexID() - Long.MIN_VALUE;
-
- // see IntValue for an explanation of the logic
- if (len == 8) {
- // default case, full normalized key
- target.putLongBigEndian(offset, value);
- }
- else if (len <= 0) {
- }
- else if (len < 8) {
- for (int i = 0; len > 0; len--, i++) {
- target.put(offset + i, (byte) ((value >>> ((3-i)<<3)) & 0xff));
- }
- }
- else {
- target.putLongBigEndian(offset, value);
- for (int i = 8; i < len; i++) {
- target.put(offset + i, (byte) 0);
- }
- }
- }
-
- @Override
- public boolean invertNormalizedKey() {
- return false;
- }
-
- @Override
- public boolean supportsSerializationWithKeyNormalization() {
- return true;
- }
-
- @Override
- public void writeWithKeyNormalization(VertexWithRank record, DataOutputView target) throws IOException {
- target.writeLong(record.getVertexID() - Long.MIN_VALUE);
- target.writeDouble(record.getRank());
- }
-
- @Override
- public VertexWithRank readWithKeyDenormalization(VertexWithRank reuse, DataInputView source) throws IOException {
- reuse.setVertexID(source.readLong() + Long.MIN_VALUE);
- reuse.setRank(source.readDouble());
- return reuse;
- }
-
- @Override
- public VertexWithRankComparator duplicate() {
- return new VertexWithRankComparator();
- }
-
- @Override
- public int extractKeys(Object record, Object[] target, int index) {
- target[index] = ((VertexWithRank) record).getVertexID();
- return 1;
- }
-
- @Override
- @SuppressWarnings("rawtypes")
- public TypeComparator[] getFlatComparators() {
- return comparators;
- }
-}
http://git-wip-us.apache.org/repos/asf/flink/blob/a7a57ebe/flink-tests/src/test/java/org/apache/flink/test/iterative/nephele/customdanglingpagerank/types/VertexWithRankComparatorFactory.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/iterative/nephele/customdanglingpagerank/types/VertexWithRankComparatorFactory.java b/flink-tests/src/test/java/org/apache/flink/test/iterative/nephele/customdanglingpagerank/types/VertexWithRankComparatorFactory.java
deleted file mode 100644
index 33ddb82..0000000
--- a/flink-tests/src/test/java/org/apache/flink/test/iterative/nephele/customdanglingpagerank/types/VertexWithRankComparatorFactory.java
+++ /dev/null
@@ -1,39 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.test.iterative.nephele.customdanglingpagerank.types;
-
-import org.apache.flink.api.common.typeutils.TypeComparatorFactory;
-import org.apache.flink.configuration.Configuration;
-
-/**
- *
- */
-public final class VertexWithRankComparatorFactory implements TypeComparatorFactory<VertexWithRank> {
-
- @Override
- public void writeParametersToConfig(Configuration config) {}
-
- @Override
- public void readParametersFromConfig(Configuration config, ClassLoader cl) throws ClassNotFoundException {}
-
- @Override
- public VertexWithRankComparator createComparator() {
- return new VertexWithRankComparator();
- }
-}
http://git-wip-us.apache.org/repos/asf/flink/blob/a7a57ebe/flink-tests/src/test/java/org/apache/flink/test/iterative/nephele/customdanglingpagerank/types/VertexWithRankDanglingToVertexWithAdjacencyListPairComparatorFactory.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/iterative/nephele/customdanglingpagerank/types/VertexWithRankDanglingToVertexWithAdjacencyListPairComparatorFactory.java b/flink-tests/src/test/java/org/apache/flink/test/iterative/nephele/customdanglingpagerank/types/VertexWithRankDanglingToVertexWithAdjacencyListPairComparatorFactory.java
deleted file mode 100644
index 802fa16..0000000
--- a/flink-tests/src/test/java/org/apache/flink/test/iterative/nephele/customdanglingpagerank/types/VertexWithRankDanglingToVertexWithAdjacencyListPairComparatorFactory.java
+++ /dev/null
@@ -1,91 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.test.iterative.nephele.customdanglingpagerank.types;
-
-import org.apache.flink.api.common.typeutils.TypeComparator;
-import org.apache.flink.api.common.typeutils.TypePairComparator;
-import org.apache.flink.api.common.typeutils.TypePairComparatorFactory;
-
-
-/**
- *
- */
-public class VertexWithRankDanglingToVertexWithAdjacencyListPairComparatorFactory
- implements TypePairComparatorFactory<VertexWithRankAndDangling, VertexWithAdjacencyList>
-{
-
- @Override
- public VertexWithRankDanglingToVertexWithAdjacencyListPairComparator createComparator12(
- TypeComparator<VertexWithRankAndDangling> comparator1, TypeComparator<VertexWithAdjacencyList> comparator2)
- {
- return new VertexWithRankDanglingToVertexWithAdjacencyListPairComparator();
- }
-
- @Override
- public VertexWithAdjacencyListToVertexWithRankDanglingPairComparator createComparator21(
- TypeComparator<VertexWithRankAndDangling> comparator1, TypeComparator<VertexWithAdjacencyList> comparator2)
- {
- return new VertexWithAdjacencyListToVertexWithRankDanglingPairComparator();
- }
-
-
- public static final class VertexWithRankDanglingToVertexWithAdjacencyListPairComparator
- extends TypePairComparator<VertexWithRankAndDangling, VertexWithAdjacencyList>
- {
- private long reference;
-
- @Override
- public void setReference(VertexWithRankAndDangling reference) {
- this.reference = reference.getVertexID();
- }
-
- @Override
- public boolean equalToReference(VertexWithAdjacencyList candidate) {
- return this.reference == candidate.getVertexID();
- }
-
- @Override
- public int compareToReference(VertexWithAdjacencyList candidate) {
- long diff = candidate.getVertexID() - this.reference;
- return diff < 0 ? -1 : diff > 0 ? 1 : 0;
- }
- }
-
- public static final class VertexWithAdjacencyListToVertexWithRankDanglingPairComparator
- extends TypePairComparator<VertexWithAdjacencyList, VertexWithRankAndDangling>
- {
- private long reference;
-
- @Override
- public void setReference(VertexWithAdjacencyList reference) {
- this.reference = reference.getVertexID();
- }
-
- @Override
- public boolean equalToReference(VertexWithRankAndDangling candidate) {
- return this.reference == candidate.getVertexID();
- }
-
- @Override
- public int compareToReference(VertexWithRankAndDangling candidate) {
- long diff = candidate.getVertexID() - this.reference;
- return diff < 0 ? -1 : diff > 0 ? 1 : 0;
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/flink/blob/a7a57ebe/flink-tests/src/test/java/org/apache/flink/test/iterative/nephele/customdanglingpagerank/types/VertexWithRankDanglingToVertexWithRankPairComparatorFactory.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/iterative/nephele/customdanglingpagerank/types/VertexWithRankDanglingToVertexWithRankPairComparatorFactory.java b/flink-tests/src/test/java/org/apache/flink/test/iterative/nephele/customdanglingpagerank/types/VertexWithRankDanglingToVertexWithRankPairComparatorFactory.java
deleted file mode 100644
index 674d85a..0000000
--- a/flink-tests/src/test/java/org/apache/flink/test/iterative/nephele/customdanglingpagerank/types/VertexWithRankDanglingToVertexWithRankPairComparatorFactory.java
+++ /dev/null
@@ -1,91 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.test.iterative.nephele.customdanglingpagerank.types;
-
-import org.apache.flink.api.common.typeutils.TypeComparator;
-import org.apache.flink.api.common.typeutils.TypePairComparator;
-import org.apache.flink.api.common.typeutils.TypePairComparatorFactory;
-
-
-/**
- *
- */
-public class VertexWithRankDanglingToVertexWithRankPairComparatorFactory
- implements TypePairComparatorFactory<VertexWithRankAndDangling, VertexWithRank>
-{
-
- @Override
- public VertexWithRankDanglingToVertexWithRankComparator createComparator12(
- TypeComparator<VertexWithRankAndDangling> comparator1, TypeComparator<VertexWithRank> comparator2)
- {
- return new VertexWithRankDanglingToVertexWithRankComparator();
- }
-
- @Override
- public VertexWithRankToVertexWithRankDanglingPairComparator createComparator21(
- TypeComparator<VertexWithRankAndDangling> comparator1, TypeComparator<VertexWithRank> comparator2)
- {
- return new VertexWithRankToVertexWithRankDanglingPairComparator();
- }
-
-
- public static final class VertexWithRankDanglingToVertexWithRankComparator
- extends TypePairComparator<VertexWithRankAndDangling, VertexWithRank>
- {
- private long reference;
-
- @Override
- public void setReference(VertexWithRankAndDangling reference) {
- this.reference = reference.getVertexID();
- }
-
- @Override
- public boolean equalToReference(VertexWithRank candidate) {
- return this.reference == candidate.getVertexID();
- }
-
- @Override
- public int compareToReference(VertexWithRank candidate) {
- long diff = candidate.getVertexID() - this.reference;
- return diff < 0 ? -1 : diff > 0 ? 1 : 0;
- }
- }
-
- public static final class VertexWithRankToVertexWithRankDanglingPairComparator
- extends TypePairComparator<VertexWithRank, VertexWithRankAndDangling>
- {
- private long reference;
-
- @Override
- public void setReference(VertexWithRank reference) {
- this.reference = reference.getVertexID();
- }
-
- @Override
- public boolean equalToReference(VertexWithRankAndDangling candidate) {
- return this.reference == candidate.getVertexID();
- }
-
- @Override
- public int compareToReference(VertexWithRankAndDangling candidate) {
- long diff = candidate.getVertexID() - this.reference;
- return diff < 0 ? -1 : diff > 0 ? 1 : 0;
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/flink/blob/a7a57ebe/flink-tests/src/test/java/org/apache/flink/test/iterative/nephele/customdanglingpagerank/types/VertexWithRankSerializer.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/iterative/nephele/customdanglingpagerank/types/VertexWithRankSerializer.java b/flink-tests/src/test/java/org/apache/flink/test/iterative/nephele/customdanglingpagerank/types/VertexWithRankSerializer.java
deleted file mode 100644
index 2c3abcd..0000000
--- a/flink-tests/src/test/java/org/apache/flink/test/iterative/nephele/customdanglingpagerank/types/VertexWithRankSerializer.java
+++ /dev/null
@@ -1,81 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.test.iterative.nephele.customdanglingpagerank.types;
-
-import java.io.IOException;
-
-import org.apache.flink.api.common.typeutils.base.TypeSerializerSingleton;
-import org.apache.flink.core.memory.DataInputView;
-import org.apache.flink.core.memory.DataOutputView;
-
-public final class VertexWithRankSerializer extends TypeSerializerSingleton<VertexWithRank> {
-
- private static final long serialVersionUID = 1L;
-
-
- @Override
- public boolean isImmutableType() {
- return false;
- }
-
- @Override
- public VertexWithRank createInstance() {
- return new VertexWithRank();
- }
-
- @Override
- public VertexWithRank copy(VertexWithRank from) {
- return new VertexWithRank(from.getVertexID(), from.getRank());
- }
-
- @Override
- public VertexWithRank copy(VertexWithRank from, VertexWithRank reuse) {
- reuse.setVertexID(from.getVertexID());
- reuse.setRank(from.getRank());
- return reuse;
- }
-
- @Override
- public int getLength() {
- return 16;
- }
-
- @Override
- public void serialize(VertexWithRank record, DataOutputView target) throws IOException {
- target.writeLong(record.getVertexID());
- target.writeDouble(record.getRank());
- }
-
- @Override
- public VertexWithRank deserialize(DataInputView source) throws IOException {
- return new VertexWithRank(source.readLong(), source.readDouble());
- }
-
- @Override
- public VertexWithRank deserialize(VertexWithRank target, DataInputView source) throws IOException {
- target.setVertexID(source.readLong());
- target.setRank(source.readDouble());
- return target;
- }
-
- @Override
- public void copy(DataInputView source, DataOutputView target) throws IOException {
- target.write(source, 16);
- }
-}
http://git-wip-us.apache.org/repos/asf/flink/blob/a7a57ebe/flink-tests/src/test/java/org/apache/flink/test/iterative/nephele/customdanglingpagerank/types/VertexWithRankSerializerFactory.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/iterative/nephele/customdanglingpagerank/types/VertexWithRankSerializerFactory.java b/flink-tests/src/test/java/org/apache/flink/test/iterative/nephele/customdanglingpagerank/types/VertexWithRankSerializerFactory.java
deleted file mode 100644
index 67ce028..0000000
--- a/flink-tests/src/test/java/org/apache/flink/test/iterative/nephele/customdanglingpagerank/types/VertexWithRankSerializerFactory.java
+++ /dev/null
@@ -1,56 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.test.iterative.nephele.customdanglingpagerank.types;
-
-import org.apache.flink.api.common.typeutils.TypeSerializerFactory;
-import org.apache.flink.configuration.Configuration;
-
-
-public final class VertexWithRankSerializerFactory implements TypeSerializerFactory<VertexWithRank> {
-
- private static final VertexWithRankSerializer INSTANCE = new VertexWithRankSerializer();
-
- @Override
- public void writeParametersToConfig(Configuration config) {}
-
- @Override
- public void readParametersFromConfig(Configuration config, ClassLoader cl) throws ClassNotFoundException {}
-
- @Override
- public VertexWithRankSerializer getSerializer() {
- return INSTANCE;
- }
-
- @Override
- public Class<VertexWithRank> getDataType() {
- return VertexWithRank.class;
- }
-
- // --------------------------------------------------------------------------------------------
-
- @Override
- public int hashCode() {
- return 1;
- }
-
- @Override
- public boolean equals(Object obj) {
- return obj != null && obj.getClass() == VertexWithRankSerializerFactory.class;
- }
-}
http://git-wip-us.apache.org/repos/asf/flink/blob/a7a57ebe/flink-tests/src/test/java/org/apache/flink/test/iterative/nephele/danglingpagerank/AsciiLongArrayView.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/iterative/nephele/danglingpagerank/AsciiLongArrayView.java b/flink-tests/src/test/java/org/apache/flink/test/iterative/nephele/danglingpagerank/AsciiLongArrayView.java
deleted file mode 100644
index 05b3f9b..0000000
--- a/flink-tests/src/test/java/org/apache/flink/test/iterative/nephele/danglingpagerank/AsciiLongArrayView.java
+++ /dev/null
@@ -1,166 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-
-package org.apache.flink.test.iterative.nephele.danglingpagerank;
-
-import java.io.Serializable;
-
-import com.google.common.base.Charsets;
-
-public class AsciiLongArrayView implements Serializable {
- private static final long serialVersionUID = 1L;
-
- private byte[] buffer;
-
- private int offset;
- private int numBytes;
-
- private int tokenOffset;
- private int tokenNumBytes;
-
- private static final int NOT_SET = -1;
-
- private static final int RADIX_TEN = 10;
- private static final long MULTMIN_RADIX_TEN = Long.MIN_VALUE / 10;
- private static final long N_MULTMAX_RADIX_TEN = -Long.MAX_VALUE / 10;
-
- public void set(byte[] buffer, int offset, int numBytes) {
- this.buffer = buffer;
- this.offset = offset;
- this.numBytes = numBytes;
-
- this.tokenOffset = NOT_SET;
- checkForSingleTrailingWhitespace();
- }
-
- private void checkForSingleTrailingWhitespace() {
- if (Character.isWhitespace((char) buffer[offset + numBytes -1])) {
- numBytes--;
- }
- }
-
- public int numElements() {
- int matches = 0;
- int pos = offset;
- while (pos < offset + numBytes) {
- if (Character.isWhitespace((char) buffer[pos])) {
- matches++;
- }
- pos++;
- }
- return matches + 1;
- }
-
- public boolean next() {
-
- if (tokenOffset == NOT_SET) {
- tokenOffset = offset;
- } else {
- tokenOffset += tokenNumBytes + 1;
- if (tokenOffset > offset + numBytes) {
- return false;
- }
- }
-
- tokenNumBytes = 1;
- while (true) {
- int candidatePos = tokenOffset + tokenNumBytes;
- if (candidatePos >= offset + numBytes || Character.isWhitespace((char) buffer[candidatePos])) {
- break;
- }
- tokenNumBytes++;
- }
-
- return true;
- }
-
- private char tokenCharAt(int pos) {
- return (char) buffer[tokenOffset + pos];
- }
-
- public long element() {
-
- long result = 0;
- boolean negative = false;
- int i = 0, max = tokenNumBytes;
- long limit;
- long multmin;
- int digit;
-
- if (max > 0) {
- if (tokenCharAt(0) == '-') {
- negative = true;
- limit = Long.MIN_VALUE;
- i++;
- } else {
- limit = -Long.MAX_VALUE;
- }
-
- multmin = negative ? MULTMIN_RADIX_TEN : N_MULTMAX_RADIX_TEN;
-
- if (i < max) {
- digit = Character.digit(tokenCharAt(i++), RADIX_TEN);
- if (digit < 0) {
- throw new NumberFormatException(toString());
- } else {
- result = -digit;
- }
- }
- while (i < max) {
- // Accumulating negatively avoids surprises near MAX_VALUE
- digit = Character.digit(tokenCharAt(i++), RADIX_TEN);
- if (digit < 0) {
- throw new NumberFormatException(toString());
- }
- if (result < multmin) {
- throw new NumberFormatException(toString());
- }
- result *= RADIX_TEN;
- if (result < limit + digit) {
- throw new NumberFormatException(toString());
- }
- result -= digit;
- }
- } else {
- throw new NumberFormatException(toString());
- }
- if (negative) {
- if (i > 1) {
- return result;
- } else { /* Only got "-" */
- throw new NumberFormatException(toString());
- }
- } else {
- return -result;
- }
- }
-
-// public double elementAsDouble() {
-// String token = new String(buffer, tokenOffset, tokenNumBytes, Charsets.US_ASCII);
-// return Double.valueOf(token);
-// }
-
-
- @Override
- public String toString() {
- return "[" + new String(buffer, offset, numBytes, Charsets.US_ASCII) + "] (buffer length: " + buffer.length +
- ", offset: " + offset + ", numBytes: " + numBytes + ")";
- }
-
-}
http://git-wip-us.apache.org/repos/asf/flink/blob/a7a57ebe/flink-tests/src/test/java/org/apache/flink/test/iterative/nephele/danglingpagerank/BooleanValue.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/iterative/nephele/danglingpagerank/BooleanValue.java b/flink-tests/src/test/java/org/apache/flink/test/iterative/nephele/danglingpagerank/BooleanValue.java
deleted file mode 100644
index af04557..0000000
--- a/flink-tests/src/test/java/org/apache/flink/test/iterative/nephele/danglingpagerank/BooleanValue.java
+++ /dev/null
@@ -1,57 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-
-package org.apache.flink.test.iterative.nephele.danglingpagerank;
-
-import java.io.IOException;
-
-import org.apache.flink.core.memory.DataInputView;
-import org.apache.flink.core.memory.DataOutputView;
-import org.apache.flink.types.Value;
-
-public class BooleanValue implements Value {
- private static final long serialVersionUID = 1L;
-
- private boolean value;
-
- public BooleanValue(boolean value) {
- this.value = value;
- }
-
- public BooleanValue() {
- }
-
- public boolean get() {
- return value;
- }
-
- public void set(boolean value) {
- this.value = value;
- }
-
- @Override
- public void write(DataOutputView out) throws IOException {
- out.writeBoolean(value);
- }
-
- @Override
- public void read(DataInputView in) throws IOException {
- value = in.readBoolean();
- }
-}
http://git-wip-us.apache.org/repos/asf/flink/blob/a7a57ebe/flink-tests/src/test/java/org/apache/flink/test/iterative/nephele/danglingpagerank/CompensatableDanglingPageRank.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/iterative/nephele/danglingpagerank/CompensatableDanglingPageRank.java b/flink-tests/src/test/java/org/apache/flink/test/iterative/nephele/danglingpagerank/CompensatableDanglingPageRank.java
deleted file mode 100644
index 78038b3..0000000
--- a/flink-tests/src/test/java/org/apache/flink/test/iterative/nephele/danglingpagerank/CompensatableDanglingPageRank.java
+++ /dev/null
@@ -1,295 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-
-package org.apache.flink.test.iterative.nephele.danglingpagerank;
-
-import org.apache.flink.api.common.operators.util.UserCodeClassWrapper;
-import org.apache.flink.api.common.typeutils.TypeComparatorFactory;
-import org.apache.flink.api.common.typeutils.TypePairComparatorFactory;
-import org.apache.flink.api.common.typeutils.TypeSerializerFactory;
-import org.apache.flink.api.common.typeutils.record.RecordComparatorFactory;
-import org.apache.flink.api.common.typeutils.record.RecordPairComparatorFactory;
-import org.apache.flink.api.common.typeutils.record.RecordSerializerFactory;
-import org.apache.flink.api.java.record.io.FileOutputFormat;
-import org.apache.flink.configuration.Configuration;
-import org.apache.flink.runtime.iterative.task.IterationHeadPactTask;
-import org.apache.flink.runtime.iterative.task.IterationIntermediatePactTask;
-import org.apache.flink.runtime.iterative.task.IterationTailPactTask;
-import org.apache.flink.runtime.jobgraph.JobVertex;
-import org.apache.flink.runtime.jobgraph.DistributionPattern;
-import org.apache.flink.runtime.jobgraph.InputFormatVertex;
-import org.apache.flink.runtime.jobgraph.JobGraph;
-import org.apache.flink.runtime.jobgraph.OutputFormatVertex;
-import org.apache.flink.runtime.jobmanager.scheduler.SlotSharingGroup;
-import org.apache.flink.runtime.operators.BuildSecondCachedJoinDriver;
-import org.apache.flink.runtime.operators.CoGroupDriver;
-import org.apache.flink.runtime.operators.CollectorMapDriver;
-import org.apache.flink.runtime.operators.DriverStrategy;
-import org.apache.flink.runtime.operators.shipping.ShipStrategyType;
-import org.apache.flink.runtime.operators.util.LocalStrategy;
-import org.apache.flink.runtime.operators.util.TaskConfig;
-import org.apache.flink.test.iterative.nephele.JobGraphUtils;
-import org.apache.flink.types.LongValue;
-import org.apache.flink.util.OperatingSystem;
-
-public class CompensatableDanglingPageRank {
-
- private static final TypeSerializerFactory<?> recSerializer = RecordSerializerFactory.get();
-
- @SuppressWarnings("unchecked")
- private static final TypeComparatorFactory<?> fieldZeroComparator = new RecordComparatorFactory(new int[] {0}, new Class[] {LongValue.class}, new boolean[] {true});
-
- private static final TypePairComparatorFactory<?, ?> pairComparatorFactory = new RecordPairComparatorFactory();
-
-
- private static final int NUM_FILE_HANDLES_PER_SORT = 64;
-
- private static final float SORT_SPILL_THRESHOLD = 0.85f;
-
- private static final int ITERATION_ID = 1;
-
-
-// public static void main(String[] args) throws Exception {
-// String confPath = args.length >= 6 ? confPath = args[5] : PlayConstants.PLAY_DIR + "local-conf";
-//
-// GlobalConfiguration.loadConfiguration(confPath);
-// Configuration conf = GlobalConfiguration.getConfiguration();
-//
-// JobGraph jobGraph = getJobGraph(args);
-// JobGraphUtils.submit(jobGraph, conf);
-// }
-
- public static JobGraph getJobGraph(String[] args) throws Exception {
-
- int parallelism = 2;
- String pageWithRankInputPath = ""; // "file://" + PlayConstants.PLAY_DIR + "test-inputs/danglingpagerank/pageWithRank";
- String adjacencyListInputPath = ""; // "file://" + PlayConstants.PLAY_DIR +
-// "test-inputs/danglingpagerank/adjacencylists";
- String outputPath = OperatingSystem.isWindows() ? "file:/c:/tmp/flink/iterations" : "file:///tmp/flink/iterations";
-// String confPath = PlayConstants.PLAY_DIR + "local-conf";
- int minorConsumer = 25;
- int matchMemory = 50;
- int coGroupSortMemory = 50;
- int numIterations = 25;
- long numVertices = 5;
- long numDanglingVertices = 1;
-
- String failingWorkers = "1";
- int failingIteration = 2;
- double messageLoss = 0.75;
-
- if (args.length >= 15) {
- parallelism = Integer.parseInt(args[0]);
- pageWithRankInputPath = args[1];
- adjacencyListInputPath = args[2];
- outputPath = args[3];
-// confPath = args[4];
- minorConsumer = Integer.parseInt(args[5]);
- matchMemory = Integer.parseInt(args[6]);
- coGroupSortMemory = Integer.parseInt(args[7]);
- numIterations = Integer.parseInt(args[8]);
- numVertices = Long.parseLong(args[9]);
- numDanglingVertices = Long.parseLong(args[10]);
- failingWorkers = args[11];
- failingIteration = Integer.parseInt(args[12]);
- messageLoss = Double.parseDouble(args[13]);
- }
-
- int totalMemoryConsumption = 3*minorConsumer + matchMemory + coGroupSortMemory;
-
- JobGraph jobGraph = new JobGraph("CompensatableDanglingPageRank");
-
- // --------------- the inputs ---------------------
-
- // page rank input
- InputFormatVertex pageWithRankInput = JobGraphUtils.createInput(new ImprovedDanglingPageRankInputFormat(),
- pageWithRankInputPath, "DanglingPageWithRankInput", jobGraph, parallelism);
- TaskConfig pageWithRankInputConfig = new TaskConfig(pageWithRankInput.getConfiguration());
- pageWithRankInputConfig.addOutputShipStrategy(ShipStrategyType.PARTITION_HASH);
- pageWithRankInputConfig.setOutputComparator(fieldZeroComparator, 0);
- pageWithRankInputConfig.setOutputSerializer(recSerializer);
- pageWithRankInputConfig.setStubParameter("pageRank.numVertices", String.valueOf(numVertices));
-
- // edges as adjacency list
- InputFormatVertex adjacencyListInput = JobGraphUtils.createInput(new ImprovedAdjacencyListInputFormat(),
- adjacencyListInputPath, "AdjancencyListInput", jobGraph, parallelism);
- TaskConfig adjacencyListInputConfig = new TaskConfig(adjacencyListInput.getConfiguration());
- adjacencyListInputConfig.addOutputShipStrategy(ShipStrategyType.PARTITION_HASH);
- adjacencyListInputConfig.setOutputSerializer(recSerializer);
- adjacencyListInputConfig.setOutputComparator(fieldZeroComparator, 0);
-
- // --------------- the head ---------------------
- JobVertex head = JobGraphUtils.createTask(IterationHeadPactTask.class, "IterationHead", jobGraph, parallelism);
- TaskConfig headConfig = new TaskConfig(head.getConfiguration());
- headConfig.setIterationId(ITERATION_ID);
-
- // initial input / partial solution
- headConfig.addInputToGroup(0);
- headConfig.setIterationHeadPartialSolutionOrWorksetInputIndex(0);
- headConfig.setInputSerializer(recSerializer, 0);
- headConfig.setInputComparator(fieldZeroComparator, 0);
- headConfig.setInputLocalStrategy(0, LocalStrategy.SORT);
- headConfig.setRelativeMemoryInput(0, (double)minorConsumer/totalMemoryConsumption);
- headConfig.setFilehandlesInput(0, NUM_FILE_HANDLES_PER_SORT);
- headConfig.setSpillingThresholdInput(0, SORT_SPILL_THRESHOLD);
-
- // back channel / iterations
- headConfig.setRelativeBackChannelMemory((double)minorConsumer/totalMemoryConsumption);
-
- // output into iteration
- headConfig.setOutputSerializer(recSerializer);
- headConfig.addOutputShipStrategy(ShipStrategyType.FORWARD);
- headConfig.addOutputShipStrategy(ShipStrategyType.FORWARD);
-
- // final output
- TaskConfig headFinalOutConfig = new TaskConfig(new Configuration());
- headFinalOutConfig.setOutputSerializer(recSerializer);
- headFinalOutConfig.addOutputShipStrategy(ShipStrategyType.FORWARD);
- headConfig.setIterationHeadFinalOutputConfig(headFinalOutConfig);
-
- // the sync
- headConfig.setIterationHeadIndexOfSyncOutput(3);
- headConfig.setNumberOfIterations(numIterations);
-
- // the driver
- headConfig.setDriver(CollectorMapDriver.class);
- headConfig.setDriverStrategy(DriverStrategy.COLLECTOR_MAP);
- headConfig.setStubWrapper(new UserCodeClassWrapper<CompensatingMap>(CompensatingMap.class));
- headConfig.setStubParameter("pageRank.numVertices", String.valueOf(numVertices));
- headConfig.setStubParameter("compensation.failingWorker", failingWorkers);
- headConfig.setStubParameter("compensation.failingIteration", String.valueOf(failingIteration));
- headConfig.setStubParameter("compensation.messageLoss", String.valueOf(messageLoss));
- headConfig.addIterationAggregator(CompensatableDotProductCoGroup.AGGREGATOR_NAME, new PageRankStatsAggregator());
-
- // --------------- the join ---------------------
-
- JobVertex intermediate = JobGraphUtils.createTask(IterationIntermediatePactTask.class, "IterationIntermediate", jobGraph, parallelism);
- TaskConfig intermediateConfig = new TaskConfig(intermediate.getConfiguration());
- intermediateConfig.setIterationId(ITERATION_ID);
-// intermediateConfig.setDriver(RepeatableHashjoinMatchDriverWithCachedBuildside.class);
- intermediateConfig.setDriver(BuildSecondCachedJoinDriver.class);
- intermediateConfig.setDriverStrategy(DriverStrategy.HYBRIDHASH_BUILD_SECOND);
- intermediateConfig.setRelativeMemoryDriver((double)matchMemory/totalMemoryConsumption);
- intermediateConfig.addInputToGroup(0);
- intermediateConfig.addInputToGroup(1);
- intermediateConfig.setInputSerializer(recSerializer, 0);
- intermediateConfig.setInputSerializer(recSerializer, 1);
- intermediateConfig.setDriverComparator(fieldZeroComparator, 0);
- intermediateConfig.setDriverComparator(fieldZeroComparator, 1);
- intermediateConfig.setDriverPairComparator(pairComparatorFactory);
-
- intermediateConfig.setOutputSerializer(recSerializer);
- intermediateConfig.addOutputShipStrategy(ShipStrategyType.PARTITION_HASH);
- intermediateConfig.setOutputComparator(fieldZeroComparator, 0);
-
- intermediateConfig.setStubWrapper(new UserCodeClassWrapper<CompensatableDotProductMatch>(CompensatableDotProductMatch.class));
- intermediateConfig.setStubParameter("pageRank.numVertices", String.valueOf(numVertices));
- intermediateConfig.setStubParameter("compensation.failingWorker", failingWorkers);
- intermediateConfig.setStubParameter("compensation.failingIteration", String.valueOf(failingIteration));
- intermediateConfig.setStubParameter("compensation.messageLoss", String.valueOf(messageLoss));
-
- // ---------------- the tail (co group) --------------------
-
- JobVertex tail = JobGraphUtils.createTask(IterationTailPactTask.class, "IterationTail", jobGraph,
- parallelism);
- TaskConfig tailConfig = new TaskConfig(tail.getConfiguration());
- tailConfig.setIterationId(ITERATION_ID);
- tailConfig.setIsWorksetUpdate();
- // TODO we need to combine!
-
- // inputs and driver
- tailConfig.setDriver(CoGroupDriver.class);
- tailConfig.setDriverStrategy(DriverStrategy.CO_GROUP);
- tailConfig.addInputToGroup(0);
- tailConfig.addInputToGroup(1);
- tailConfig.setInputSerializer(recSerializer, 0);
- tailConfig.setInputSerializer(recSerializer, 1);
- tailConfig.setDriverComparator(fieldZeroComparator, 0);
- tailConfig.setDriverComparator(fieldZeroComparator, 1);
- tailConfig.setDriverPairComparator(pairComparatorFactory);
- tailConfig.setInputAsynchronouslyMaterialized(0, true);
- tailConfig.setRelativeInputMaterializationMemory(0, (double)minorConsumer/totalMemoryConsumption);
- tailConfig.setInputLocalStrategy(1, LocalStrategy.SORT);
- tailConfig.setInputComparator(fieldZeroComparator, 1);
- tailConfig.setRelativeMemoryInput(1, (double)coGroupSortMemory/totalMemoryConsumption);
- tailConfig.setFilehandlesInput(1, NUM_FILE_HANDLES_PER_SORT);
- tailConfig.setSpillingThresholdInput(1, SORT_SPILL_THRESHOLD);
-
- // output
- tailConfig.setOutputSerializer(recSerializer);
-
- // the stub
- tailConfig.setStubWrapper(new UserCodeClassWrapper<CompensatableDotProductCoGroup>(CompensatableDotProductCoGroup.class));
- tailConfig.setStubParameter("pageRank.numVertices", String.valueOf(numVertices));
- tailConfig.setStubParameter("pageRank.numDanglingVertices", String.valueOf(numDanglingVertices));
- tailConfig.setStubParameter("compensation.failingWorker", failingWorkers);
- tailConfig.setStubParameter("compensation.failingIteration", String.valueOf(failingIteration));
- tailConfig.setStubParameter("compensation.messageLoss", String.valueOf(messageLoss));
-
- // --------------- the output ---------------------
-
- OutputFormatVertex output = JobGraphUtils.createFileOutput(jobGraph, "FinalOutput", parallelism);
- TaskConfig outputConfig = new TaskConfig(output.getConfiguration());
- outputConfig.addInputToGroup(0);
- outputConfig.setInputSerializer(recSerializer, 0);
- outputConfig.setStubWrapper(new UserCodeClassWrapper<PageWithRankOutFormat>(PageWithRankOutFormat.class));
- outputConfig.setStubParameter(FileOutputFormat.FILE_PARAMETER_KEY, outputPath);
-
- // --------------- the auxiliaries ---------------------
-
- JobVertex sync = JobGraphUtils.createSync(jobGraph, parallelism);
- TaskConfig syncConfig = new TaskConfig(sync.getConfiguration());
- syncConfig.setNumberOfIterations(numIterations);
- syncConfig.addIterationAggregator(CompensatableDotProductCoGroup.AGGREGATOR_NAME, new PageRankStatsAggregator());
- syncConfig.setConvergenceCriterion(CompensatableDotProductCoGroup.AGGREGATOR_NAME, new DiffL1NormConvergenceCriterion());
- syncConfig.setIterationId(ITERATION_ID);
-
- // --------------- the wiring ---------------------
-
- JobGraphUtils.connect(pageWithRankInput, head, DistributionPattern.ALL_TO_ALL);
-
- JobGraphUtils.connect(head, intermediate, DistributionPattern.POINTWISE);
- intermediateConfig.setGateIterativeWithNumberOfEventsUntilInterrupt(0, 1);
-
- JobGraphUtils.connect(adjacencyListInput, intermediate, DistributionPattern.ALL_TO_ALL);
-
- JobGraphUtils.connect(head, tail, DistributionPattern.POINTWISE);
- JobGraphUtils.connect(intermediate, tail, DistributionPattern.ALL_TO_ALL);
- tailConfig.setGateIterativeWithNumberOfEventsUntilInterrupt(0, 1);
- tailConfig.setGateIterativeWithNumberOfEventsUntilInterrupt(1, parallelism);
-
- JobGraphUtils.connect(head, output, DistributionPattern.POINTWISE);
-
- JobGraphUtils.connect(head, sync, DistributionPattern.POINTWISE);
-
- SlotSharingGroup sharingGroup = new SlotSharingGroup();
- pageWithRankInput.setSlotSharingGroup(sharingGroup);
- adjacencyListInput.setSlotSharingGroup(sharingGroup);
- head.setSlotSharingGroup(sharingGroup);
- intermediate.setSlotSharingGroup(sharingGroup);
- tail.setSlotSharingGroup(sharingGroup);
- output.setSlotSharingGroup(sharingGroup);
- sync.setSlotSharingGroup(sharingGroup);
-
- tail.setStrictlyCoLocatedWith(head);
- intermediate.setStrictlyCoLocatedWith(head);
-
- return jobGraph;
- }
-}
http://git-wip-us.apache.org/repos/asf/flink/blob/a7a57ebe/flink-tests/src/test/java/org/apache/flink/test/iterative/nephele/danglingpagerank/CompensatableDotProductCoGroup.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/iterative/nephele/danglingpagerank/CompensatableDotProductCoGroup.java b/flink-tests/src/test/java/org/apache/flink/test/iterative/nephele/danglingpagerank/CompensatableDotProductCoGroup.java
deleted file mode 100644
index bd1eeb9..0000000
--- a/flink-tests/src/test/java/org/apache/flink/test/iterative/nephele/danglingpagerank/CompensatableDotProductCoGroup.java
+++ /dev/null
@@ -1,137 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.test.iterative.nephele.danglingpagerank;
-
-import java.util.Iterator;
-import java.util.Set;
-
-import org.apache.flink.api.java.record.functions.CoGroupFunction;
-import org.apache.flink.configuration.Configuration;
-import org.apache.flink.test.iterative.nephele.ConfigUtils;
-import org.apache.flink.types.DoubleValue;
-import org.apache.flink.types.LongValue;
-import org.apache.flink.types.Record;
-import org.apache.flink.util.Collector;
-
-@SuppressWarnings("deprecation")
-public class CompensatableDotProductCoGroup extends CoGroupFunction {
-
- private static final long serialVersionUID = 1L;
-
-
- public static final String AGGREGATOR_NAME = "pagerank.aggregator";
-
- private Record accumulator = new Record();
-
- private int workerIndex;
-
- private int currentIteration;
-
- private int failingIteration;
-
- private Set<Integer> failingWorkers;
-
- private PageRankStatsAggregator aggregator;
-
- private long numVertices;
-
- private long numDanglingVertices;
-
- private double dampingFactor;
-
- private double danglingRankFactor;
-
- private static final double BETA = 0.85;
-
- private final DoubleValue newRank = new DoubleValue();
-
- private BooleanValue isDangling = new BooleanValue();
-
- private LongValue vertexID = new LongValue();
-
- private DoubleValue doubleInstance = new DoubleValue();
-
- @Override
- public void open(Configuration parameters) {
- workerIndex = getRuntimeContext().getIndexOfThisSubtask();
- currentIteration = getIterationRuntimeContext().getSuperstepNumber();
-
- failingIteration = ConfigUtils.asInteger("compensation.failingIteration", parameters);
- failingWorkers = ConfigUtils.asIntSet("compensation.failingWorker", parameters);
- numVertices = ConfigUtils.asLong("pageRank.numVertices", parameters);
- numDanglingVertices = ConfigUtils.asLong("pageRank.numDanglingVertices", parameters);
-
- dampingFactor = (1d - BETA) / (double) numVertices;
-
- aggregator = getIterationRuntimeContext().getIterationAggregator(AGGREGATOR_NAME);
-
- if (currentIteration == 1) {
- danglingRankFactor = BETA * (double) numDanglingVertices / ((double) numVertices * (double) numVertices);
- } else {
- PageRankStats previousAggregate = getIterationRuntimeContext().getPreviousIterationAggregate(AGGREGATOR_NAME);
- danglingRankFactor = BETA * previousAggregate.danglingRank() / (double) numVertices;
- }
- }
-
- @Override
- public void coGroup(Iterator<Record> currentPageRankIterator, Iterator<Record> partialRanks, Collector<Record> collector) {
-
- if (!currentPageRankIterator.hasNext()) {
- long missingVertex = partialRanks.next().getField(0, LongValue.class).getValue();
- throw new IllegalStateException("No current page rank for vertex [" + missingVertex + "]!");
- }
-
- Record currentPageRank = currentPageRankIterator.next();
-
- long edges = 0;
- double summedRank = 0;
- while (partialRanks.hasNext()) {
- Record pr = partialRanks.next();
- summedRank += pr.getField(1, doubleInstance).getValue();
- edges++;
- }
-
- double rank = BETA * summedRank + dampingFactor + danglingRankFactor;
-
- double currentRank = currentPageRank.getField(1, doubleInstance).getValue();
- isDangling = currentPageRank.getField(2, isDangling);
-
- double danglingRankToAggregate = isDangling.get() ? rank : 0;
- long danglingVerticesToAggregate = isDangling.get() ? 1 : 0;
-
- double diff = Math.abs(currentRank - rank);
-
- aggregator.aggregate(diff, rank, danglingRankToAggregate, danglingVerticesToAggregate, 1, edges, summedRank, 0);
-
- newRank.setValue(rank);
-
- accumulator.setField(0, currentPageRank.getField(0, vertexID));
- accumulator.setField(1, newRank);
- accumulator.setField(2, isDangling);
-
- collector.collect(accumulator);
- }
-
- @Override
- public void close() throws Exception {
- if (currentIteration == failingIteration && failingWorkers.contains(workerIndex)) {
- aggregator.reset();
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/flink/blob/a7a57ebe/flink-tests/src/test/java/org/apache/flink/test/iterative/nephele/danglingpagerank/CompensatableDotProductMatch.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/iterative/nephele/danglingpagerank/CompensatableDotProductMatch.java b/flink-tests/src/test/java/org/apache/flink/test/iterative/nephele/danglingpagerank/CompensatableDotProductMatch.java
deleted file mode 100644
index 0508886..0000000
--- a/flink-tests/src/test/java/org/apache/flink/test/iterative/nephele/danglingpagerank/CompensatableDotProductMatch.java
+++ /dev/null
@@ -1,102 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-
-package org.apache.flink.test.iterative.nephele.danglingpagerank;
-
-import java.util.Random;
-import java.util.Set;
-
-import org.apache.flink.api.java.record.functions.JoinFunction;
-import org.apache.flink.configuration.Configuration;
-import org.apache.flink.test.iterative.nephele.ConfigUtils;
-import org.apache.flink.types.DoubleValue;
-import org.apache.flink.types.LongValue;
-import org.apache.flink.types.Record;
-import org.apache.flink.util.Collector;
-
-@SuppressWarnings("deprecation")
-public class CompensatableDotProductMatch extends JoinFunction {
-
- private static final long serialVersionUID = 1L;
-
- private Record record;
-
- private LongValue vertexID;
-
- private DoubleValue partialRank;
-
- private DoubleValue rank = new DoubleValue();
-
- private LongArrayView adjacentNeighbors = new LongArrayView();
-
- private int workerIndex;
-
- private int currentIteration;
-
- private int failingIteration;
-
- private Set<Integer> failingWorkers;
-
- private double messageLoss;
-
- private Random random;
-
- @Override
- public void open(Configuration parameters) {
- record = new Record();
- vertexID = new LongValue();
- partialRank = new DoubleValue();
-
- workerIndex = getRuntimeContext().getIndexOfThisSubtask();
- currentIteration = getIterationRuntimeContext().getSuperstepNumber();
- failingIteration = ConfigUtils.asInteger("compensation.failingIteration", parameters);
- failingWorkers = ConfigUtils.asIntSet("compensation.failingWorker", parameters);
- messageLoss = ConfigUtils.asDouble("compensation.messageLoss", parameters);
-
- random = new Random();
- }
-
- @Override
- public void join(Record pageWithRank, Record adjacencyList, Collector<Record> collector) {
-
- rank = pageWithRank.getField(1, rank);
- adjacentNeighbors = adjacencyList.getField(1, adjacentNeighbors);
- int numNeighbors = adjacentNeighbors.size();
-
- double rankToDistribute = rank.getValue() / (double) numNeighbors;
-
- partialRank.setValue(rankToDistribute);
- record.setField(1, partialRank);
-
- boolean isFailure = currentIteration == failingIteration && failingWorkers.contains(workerIndex);
-
- for (int n = 0; n < numNeighbors; n++) {
- vertexID.setValue(adjacentNeighbors.getQuick(n));
- record.setField(0, vertexID);
-
- if (isFailure) {
- if (random.nextDouble() >= messageLoss) {
- collector.collect(record);
- }
- } else {
- collector.collect(record);
- }
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/flink/blob/a7a57ebe/flink-tests/src/test/java/org/apache/flink/test/iterative/nephele/danglingpagerank/CompensatingMap.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/iterative/nephele/danglingpagerank/CompensatingMap.java b/flink-tests/src/test/java/org/apache/flink/test/iterative/nephele/danglingpagerank/CompensatingMap.java
deleted file mode 100644
index d8189ef..0000000
--- a/flink-tests/src/test/java/org/apache/flink/test/iterative/nephele/danglingpagerank/CompensatingMap.java
+++ /dev/null
@@ -1,88 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-
-package org.apache.flink.test.iterative.nephele.danglingpagerank;
-
-import java.util.Set;
-
-import org.apache.flink.api.java.record.functions.MapFunction;
-import org.apache.flink.configuration.Configuration;
-import org.apache.flink.test.iterative.nephele.ConfigUtils;
-import org.apache.flink.types.DoubleValue;
-import org.apache.flink.types.Record;
-import org.apache.flink.util.Collector;
-
-@SuppressWarnings("deprecation")
-public class CompensatingMap extends MapFunction {
-
- private static final long serialVersionUID = 1L;
-
- private int workerIndex;
-
- private int currentIteration;
-
- private long numVertices;
-
- private int failingIteration;
-
- private Set<Integer> failingWorkers;
-
- private double uniformRank;
-
- private double rescaleFactor;
-
- private DoubleValue rank = new DoubleValue();
-
- @Override
- public void open(Configuration parameters) {
-
- workerIndex = getRuntimeContext().getIndexOfThisSubtask();
- currentIteration = getIterationRuntimeContext().getSuperstepNumber();
- failingIteration = ConfigUtils.asInteger("compensation.failingIteration", parameters);
- failingWorkers = ConfigUtils.asIntSet("compensation.failingWorker", parameters);
- numVertices = ConfigUtils.asLong("pageRank.numVertices", parameters);
-
- if (currentIteration > 1) {
- PageRankStats stats = (PageRankStats) getIterationRuntimeContext().getPreviousIterationAggregate(
- CompensatableDotProductCoGroup.AGGREGATOR_NAME);
-
- uniformRank = 1d / (double) numVertices;
- double lostMassFactor = (numVertices - stats.numVertices()) / (double) numVertices;
- rescaleFactor = (1 - lostMassFactor) / stats.rank();
- }
- }
-
- @Override
- public void map(Record pageWithRank, Collector<Record> out) {
-
- if (currentIteration == failingIteration + 1) {
-
- rank = pageWithRank.getField(1, rank);
-
- if (failingWorkers.contains(workerIndex)) {
- rank.setValue(uniformRank);
- } else {
- rank.setValue(rank.getValue() * rescaleFactor);
- }
- pageWithRank.setField(1, rank);
- }
-
- out.collect(pageWithRank);
- }
-}
http://git-wip-us.apache.org/repos/asf/flink/blob/a7a57ebe/flink-tests/src/test/java/org/apache/flink/test/iterative/nephele/danglingpagerank/DanglingPageGenerateRankInputFormat.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/iterative/nephele/danglingpagerank/DanglingPageGenerateRankInputFormat.java b/flink-tests/src/test/java/org/apache/flink/test/iterative/nephele/danglingpagerank/DanglingPageGenerateRankInputFormat.java
deleted file mode 100644
index 4da2bf2..0000000
--- a/flink-tests/src/test/java/org/apache/flink/test/iterative/nephele/danglingpagerank/DanglingPageGenerateRankInputFormat.java
+++ /dev/null
@@ -1,62 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-
-package org.apache.flink.test.iterative.nephele.danglingpagerank;
-
-import java.util.regex.Pattern;
-
-import org.apache.flink.api.java.record.io.TextInputFormat;
-import org.apache.flink.configuration.Configuration;
-import org.apache.flink.test.iterative.nephele.ConfigUtils;
-import org.apache.flink.types.DoubleValue;
-import org.apache.flink.types.LongValue;
-import org.apache.flink.types.Record;
-
-public class DanglingPageGenerateRankInputFormat extends TextInputFormat {
-
- private static final long serialVersionUID = 1L;
-
- private DoubleValue initialRank;
-
- private static final Pattern SEPARATOR = Pattern.compile("[, \t]");
-
- @Override
- public void configure(Configuration parameters) {
- long numVertices = ConfigUtils.asLong("pageRank.numVertices", parameters);
- initialRank = new DoubleValue(1 / (double) numVertices);
- super.configure(parameters);
- }
-
- @Override
- public Record readRecord(Record target, byte[] bytes, int offset, int numBytes) {
- String str = new String(bytes, offset, numBytes);
-
- String[] tokens = SEPARATOR.split(str);
-
- long vertexID = Long.parseLong(tokens[0]);
- boolean isDangling = tokens.length > 1 && Integer.parseInt(tokens[1]) == 1;
-
- target.clear();
- target.addField(new LongValue(vertexID));
- target.addField(initialRank);
- target.addField(new BooleanValue(isDangling));
-
- return target;
- }
-}
http://git-wip-us.apache.org/repos/asf/flink/blob/a7a57ebe/flink-tests/src/test/java/org/apache/flink/test/iterative/nephele/danglingpagerank/DiffL1NormConvergenceCriterion.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/iterative/nephele/danglingpagerank/DiffL1NormConvergenceCriterion.java b/flink-tests/src/test/java/org/apache/flink/test/iterative/nephele/danglingpagerank/DiffL1NormConvergenceCriterion.java
deleted file mode 100644
index 9868173..0000000
--- a/flink-tests/src/test/java/org/apache/flink/test/iterative/nephele/danglingpagerank/DiffL1NormConvergenceCriterion.java
+++ /dev/null
@@ -1,44 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-
-package org.apache.flink.test.iterative.nephele.danglingpagerank;
-
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-import org.apache.flink.api.common.aggregators.ConvergenceCriterion;
-
-@SuppressWarnings("serial")
-public class DiffL1NormConvergenceCriterion implements ConvergenceCriterion<PageRankStats> {
-
- private static final double EPSILON = 0.00005;
-
- private static final Logger log = LoggerFactory.getLogger(DiffL1NormConvergenceCriterion.class);
-
- @Override
- public boolean isConverged(int iteration, PageRankStats pageRankStats) {
- double diff = pageRankStats.diff();
-
- if (log.isInfoEnabled()) {
- log.info("Stats in iteration [" + iteration + "]: " + pageRankStats);
- log.info("L1 norm of the vector difference is [" + diff + "] in iteration [" + iteration + "]");
- }
-
- return diff < EPSILON;
- }
-}
http://git-wip-us.apache.org/repos/asf/flink/blob/a7a57ebe/flink-tests/src/test/java/org/apache/flink/test/iterative/nephele/danglingpagerank/ImprovedAdjacencyListInputFormat.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/iterative/nephele/danglingpagerank/ImprovedAdjacencyListInputFormat.java b/flink-tests/src/test/java/org/apache/flink/test/iterative/nephele/danglingpagerank/ImprovedAdjacencyListInputFormat.java
deleted file mode 100644
index df2fed6..0000000
--- a/flink-tests/src/test/java/org/apache/flink/test/iterative/nephele/danglingpagerank/ImprovedAdjacencyListInputFormat.java
+++ /dev/null
@@ -1,74 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-
-package org.apache.flink.test.iterative.nephele.danglingpagerank;
-
-import org.apache.flink.api.java.record.io.DelimitedInputFormat;
-import org.apache.flink.types.LongValue;
-import org.apache.flink.types.Record;
-
-public class ImprovedAdjacencyListInputFormat extends DelimitedInputFormat {
- private static final long serialVersionUID = 1L;
-
- private final LongValue vertexID = new LongValue();
- private final AsciiLongArrayView arrayView = new AsciiLongArrayView();
- private final LongArrayView adjacentVertices = new LongArrayView();
-
- @Override
- public Record readRecord(Record target, byte[] bytes, int offset, int numBytes) {
-
- if (numBytes == 0) {
- return null;
- }
-
- arrayView.set(bytes, offset, numBytes);
-
- int numElements = arrayView.numElements();
- adjacentVertices.allocate(numElements - 1);
-
- try {
-
- int pos = 0;
- while (arrayView.next()) {
-
- if (pos == 0) {
- vertexID.setValue(arrayView.element());
- } else {
- adjacentVertices.setQuick(pos - 1, arrayView.element());
- }
-
- pos++;
- }
-
- //sanity check
- if (pos != numElements) {
- throw new IllegalStateException("Should have gotten " + numElements + " elements, but saw " + pos);
- }
-
- } catch (RuntimeException e) {
- throw new RuntimeException("Error parsing: " + arrayView.toString(), e);
- }
-
- target.clear();
- target.addField(vertexID);
- target.addField(adjacentVertices);
- return target;
- }
-}
-
http://git-wip-us.apache.org/repos/asf/flink/blob/a7a57ebe/flink-tests/src/test/java/org/apache/flink/test/iterative/nephele/danglingpagerank/ImprovedDanglingPageRankInputFormat.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/iterative/nephele/danglingpagerank/ImprovedDanglingPageRankInputFormat.java b/flink-tests/src/test/java/org/apache/flink/test/iterative/nephele/danglingpagerank/ImprovedDanglingPageRankInputFormat.java
deleted file mode 100644
index cfa64c8..0000000
--- a/flink-tests/src/test/java/org/apache/flink/test/iterative/nephele/danglingpagerank/ImprovedDanglingPageRankInputFormat.java
+++ /dev/null
@@ -1,73 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-
-package org.apache.flink.test.iterative.nephele.danglingpagerank;
-
-import org.apache.flink.api.java.record.io.DelimitedInputFormat;
-import org.apache.flink.configuration.Configuration;
-import org.apache.flink.test.iterative.nephele.ConfigUtils;
-import org.apache.flink.types.DoubleValue;
-import org.apache.flink.types.LongValue;
-import org.apache.flink.types.Record;
-
-public class ImprovedDanglingPageRankInputFormat extends DelimitedInputFormat {
- private static final long serialVersionUID = 1L;
-
- private LongValue vertexID = new LongValue();
-
- private DoubleValue initialRank;
-
- private BooleanValue isDangling = new BooleanValue();
-
- private AsciiLongArrayView arrayView = new AsciiLongArrayView();
-
- private static final long DANGLING_MARKER = 1l;
-
- @Override
- public void configure(Configuration parameters) {
- long numVertices = ConfigUtils.asLong("pageRank.numVertices", parameters);
- initialRank = new DoubleValue(1 / (double) numVertices);
- super.configure(parameters);
- }
-
- @Override
- public Record readRecord(Record target, byte[] bytes, int offset, int numBytes) {
- arrayView.set(bytes, offset, numBytes);
-
- try {
- arrayView.next();
- vertexID.setValue(arrayView.element());
-
- if (arrayView.next()) {
- isDangling.set(arrayView.element() == DANGLING_MARKER);
- } else {
- isDangling.set(false);
- }
-
- } catch (NumberFormatException e) {
- throw new RuntimeException("Error parsing " + arrayView.toString(), e);
- }
-
- target.clear();
- target.addField(vertexID);
- target.addField(initialRank);
- target.addField(isDangling);
- return target;
- }
-}
http://git-wip-us.apache.org/repos/asf/flink/blob/a7a57ebe/flink-tests/src/test/java/org/apache/flink/test/iterative/nephele/danglingpagerank/LongArrayView.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/iterative/nephele/danglingpagerank/LongArrayView.java b/flink-tests/src/test/java/org/apache/flink/test/iterative/nephele/danglingpagerank/LongArrayView.java
deleted file mode 100644
index 825b210..0000000
--- a/flink-tests/src/test/java/org/apache/flink/test/iterative/nephele/danglingpagerank/LongArrayView.java
+++ /dev/null
@@ -1,88 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-
-package org.apache.flink.test.iterative.nephele.danglingpagerank;
-
-import java.io.IOException;
-
-import org.apache.flink.core.memory.DataInputView;
-import org.apache.flink.core.memory.DataOutputView;
-import org.apache.flink.types.Value;
-
-public class LongArrayView implements Value {
- private static final long serialVersionUID = 1L;
-
- private long[] entries = new long[0];
- private int numEntries = 0;
-
- public LongArrayView() {
- }
-
- public long get(int index) {
- if (index >= numEntries) {
- throw new ArrayIndexOutOfBoundsException();
- }
- return getQuick(index);
- }
-
- public long getQuick(int index) {
- return entries[index];
- }
-
- public void allocate(int numEntries) {
- this.numEntries = numEntries;
- ensureCapacity();
- }
-
- public void set(int index, long value) {
- if (index >= numEntries) {
- throw new ArrayIndexOutOfBoundsException();
- }
- setQuick(index, value);
- }
-
- public void setQuick(int index, long value) {
- entries[index] = value;
- }
-
- public int size() {
- return numEntries;
- }
-
- private void ensureCapacity() {
- if (entries.length < numEntries) {
- entries = new long[numEntries];
- }
- }
-
- public void write(DataOutputView out) throws IOException {
- out.writeInt(numEntries);
- for (int n = 0; n < numEntries; n++) {
- out.writeLong(entries[n]);
- }
- }
-
- public void read(DataInputView in) throws IOException {
- numEntries = in.readInt();
- ensureCapacity();
- for (int n = 0; n < numEntries; n++) {
- entries[n] = in.readLong();
- }
- }
-}