You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by se...@apache.org on 2015/08/30 23:44:38 UTC

[3/5] flink git commit: [FLINK-1681] [tests] Remove outdated 'nephele' iteration tests.

http://git-wip-us.apache.org/repos/asf/flink/blob/a7a57ebe/flink-tests/src/test/java/org/apache/flink/test/iterative/nephele/customdanglingpagerank/types/VertexWithRankAndDanglingComparator.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/iterative/nephele/customdanglingpagerank/types/VertexWithRankAndDanglingComparator.java b/flink-tests/src/test/java/org/apache/flink/test/iterative/nephele/customdanglingpagerank/types/VertexWithRankAndDanglingComparator.java
deleted file mode 100644
index d83c3fb..0000000
--- a/flink-tests/src/test/java/org/apache/flink/test/iterative/nephele/customdanglingpagerank/types/VertexWithRankAndDanglingComparator.java
+++ /dev/null
@@ -1,153 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.test.iterative.nephele.customdanglingpagerank.types;
-
-import java.io.IOException;
-
-import org.apache.flink.api.common.typeutils.TypeComparator;
-import org.apache.flink.api.common.typeutils.base.LongComparator;
-import org.apache.flink.core.memory.DataInputView;
-import org.apache.flink.core.memory.DataOutputView;
-import org.apache.flink.core.memory.MemorySegment;
-
-public final class VertexWithRankAndDanglingComparator extends TypeComparator<VertexWithRankAndDangling> {
-
-	private static final long serialVersionUID = 1L;
-	
-	private long reference;
-
-	@SuppressWarnings("rawtypes")
-	private TypeComparator[] comparators = new TypeComparator[]{new LongComparator(true)};
-
-	@Override
-	public int hash(VertexWithRankAndDangling record) {
-		final long value = record.getVertexID();
-		return 43 + (int) (value ^ value >>> 32);
-	}
-
-	@Override
-	public void setReference(VertexWithRankAndDangling toCompare) {
-		this.reference = toCompare.getVertexID();
-	}
-
-	@Override
-	public boolean equalToReference(VertexWithRankAndDangling candidate) {
-		return candidate.getVertexID() == this.reference;
-	}
-
-	@Override
-	public int compareToReference(TypeComparator<VertexWithRankAndDangling> referencedComparator) {
-		VertexWithRankAndDanglingComparator comp = (VertexWithRankAndDanglingComparator) referencedComparator;
-		final long diff = comp.reference - this.reference;
-		return diff < 0 ? -1 : diff > 0 ? 1 : 0;
-	}
-	
-	@Override
-	public int compare(VertexWithRankAndDangling first, VertexWithRankAndDangling second) {
-		final long diff = first.getVertexID() - second.getVertexID();
-		return diff < 0 ? -1 : diff > 0 ? 1 : 0;
-	}
-
-	@Override
-	public int compareSerialized(DataInputView source1, DataInputView source2) throws IOException {
-		final long diff = source1.readLong() - source2.readLong();
-		return diff < 0 ? -1 : diff > 0 ? 1 : 0;
-	}
-
-	@Override
-	public boolean supportsNormalizedKey() {
-		return true;
-	}
-
-	@Override
-	public int getNormalizeKeyLen() {
-		return 8;
-	}
-
-	@Override
-	public boolean isNormalizedKeyPrefixOnly(int keyBytes) {
-		return keyBytes < 8;
-	}
-
-	@Override
-	public void putNormalizedKey(VertexWithRankAndDangling record, MemorySegment target, int offset, int len) {
-		final long value = record.getVertexID() - Long.MIN_VALUE;
-		
-		// see IntValue for an explanation of the logic
-		if (len == 8) {
-			// default case, full normalized key
-			target.putLongBigEndian(offset, value);
-		}
-		else if (len <= 0) {
-		}
-		else if (len < 8) {
-			for (int i = 0; len > 0; len--, i++) {
-				target.put(offset + i, (byte) ((value >>> ((3-i)<<3)) & 0xff));
-			}
-		}
-		else {
-			target.putLongBigEndian(offset, value);
-			for (int i = 8; i < len; i++) {
-				target.put(offset + i, (byte) 0);
-			}
-		}
-	}
-
-	@Override
-	public boolean invertNormalizedKey() {
-		return false;
-	}
-	
-	@Override
-	public boolean supportsSerializationWithKeyNormalization() {
-		return true;
-	}
-
-	@Override
-	public void writeWithKeyNormalization(VertexWithRankAndDangling record, DataOutputView target) throws IOException {
-		target.writeLong(record.getVertexID() - Long.MIN_VALUE);
-		target.writeDouble(record.getRank());
-		target.writeBoolean(record.isDangling());
-	}
-
-	@Override
-	public VertexWithRankAndDangling readWithKeyDenormalization(VertexWithRankAndDangling reuse, DataInputView source) throws IOException {
-		reuse.setVertexID(source.readLong() + Long.MIN_VALUE);
-		reuse.setRank(source.readDouble());
-		reuse.setDangling(source.readBoolean());
-		return reuse;
-	}
-
-	@Override
-	public VertexWithRankAndDanglingComparator duplicate() {
-		return new VertexWithRankAndDanglingComparator();
-	}
-
-	@Override
-	public int extractKeys(Object record, Object[] target, int index) {
-		target[index] = ((VertexWithRankAndDangling) record).getVertexID();
-		return 1;
-	}
-
-	@Override
-	@SuppressWarnings("rawtypes")
-	public TypeComparator[] getFlatComparators() {
-		return comparators;
-	}
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/a7a57ebe/flink-tests/src/test/java/org/apache/flink/test/iterative/nephele/customdanglingpagerank/types/VertexWithRankAndDanglingComparatorFactory.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/iterative/nephele/customdanglingpagerank/types/VertexWithRankAndDanglingComparatorFactory.java b/flink-tests/src/test/java/org/apache/flink/test/iterative/nephele/customdanglingpagerank/types/VertexWithRankAndDanglingComparatorFactory.java
deleted file mode 100644
index 532ca3e..0000000
--- a/flink-tests/src/test/java/org/apache/flink/test/iterative/nephele/customdanglingpagerank/types/VertexWithRankAndDanglingComparatorFactory.java
+++ /dev/null
@@ -1,39 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.test.iterative.nephele.customdanglingpagerank.types;
-
-import org.apache.flink.api.common.typeutils.TypeComparatorFactory;
-import org.apache.flink.configuration.Configuration;
-
-/**
- *
- */
-public final class VertexWithRankAndDanglingComparatorFactory implements TypeComparatorFactory<VertexWithRankAndDangling> {
-	
-	@Override
-	public void writeParametersToConfig(Configuration config) {}
-
-	@Override
-	public void readParametersFromConfig(Configuration config, ClassLoader cl) throws ClassNotFoundException {}
-
-	@Override
-	public VertexWithRankAndDanglingComparator createComparator() {
-		return new VertexWithRankAndDanglingComparator();
-	}
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/a7a57ebe/flink-tests/src/test/java/org/apache/flink/test/iterative/nephele/customdanglingpagerank/types/VertexWithRankAndDanglingSerializer.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/iterative/nephele/customdanglingpagerank/types/VertexWithRankAndDanglingSerializer.java b/flink-tests/src/test/java/org/apache/flink/test/iterative/nephele/customdanglingpagerank/types/VertexWithRankAndDanglingSerializer.java
deleted file mode 100644
index 8ff0233..0000000
--- a/flink-tests/src/test/java/org/apache/flink/test/iterative/nephele/customdanglingpagerank/types/VertexWithRankAndDanglingSerializer.java
+++ /dev/null
@@ -1,84 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.test.iterative.nephele.customdanglingpagerank.types;
-
-import java.io.IOException;
-
-import org.apache.flink.api.common.typeutils.base.TypeSerializerSingleton;
-import org.apache.flink.core.memory.DataInputView;
-import org.apache.flink.core.memory.DataOutputView;
-
-public final class VertexWithRankAndDanglingSerializer extends TypeSerializerSingleton<VertexWithRankAndDangling> {
-
-	private static final long serialVersionUID = 1L;
-	
-	
-	@Override
-	public boolean isImmutableType() {
-		return false;
-	}
-
-	@Override
-	public VertexWithRankAndDangling createInstance() {
-		return new VertexWithRankAndDangling();
-	}
-
-	@Override
-	public VertexWithRankAndDangling copy(VertexWithRankAndDangling from) {
-		return new VertexWithRankAndDangling(from.getVertexID(), from.getRank(), from.isDangling());
-	}
-	
-	@Override
-	public VertexWithRankAndDangling copy(VertexWithRankAndDangling from, VertexWithRankAndDangling reuse) {
-		reuse.setVertexID(from.getVertexID());
-		reuse.setRank(from.getRank());
-		reuse.setDangling(from.isDangling());
-		return reuse;
-	}
-
-	@Override
-	public int getLength() {
-		return 17;
-	}
-
-	@Override
-	public void serialize(VertexWithRankAndDangling record, DataOutputView target) throws IOException {
-		target.writeLong(record.getVertexID());
-		target.writeDouble(record.getRank());
-		target.writeBoolean(record.isDangling());
-	}
-
-	@Override
-	public VertexWithRankAndDangling deserialize(DataInputView source) throws IOException {
-		return new VertexWithRankAndDangling(source.readLong(), source.readDouble(), source.readBoolean());
-	}
-	
-	@Override
-	public VertexWithRankAndDangling deserialize(VertexWithRankAndDangling target, DataInputView source) throws IOException {
-		target.setVertexID(source.readLong());
-		target.setRank(source.readDouble());
-		target.setDangling(source.readBoolean());
-		return target;
-	}
-
-	@Override
-	public void copy(DataInputView source, DataOutputView target) throws IOException {
-		target.write(source, 17);
-	}
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/a7a57ebe/flink-tests/src/test/java/org/apache/flink/test/iterative/nephele/customdanglingpagerank/types/VertexWithRankAndDanglingSerializerFactory.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/iterative/nephele/customdanglingpagerank/types/VertexWithRankAndDanglingSerializerFactory.java b/flink-tests/src/test/java/org/apache/flink/test/iterative/nephele/customdanglingpagerank/types/VertexWithRankAndDanglingSerializerFactory.java
deleted file mode 100644
index 7f40d72..0000000
--- a/flink-tests/src/test/java/org/apache/flink/test/iterative/nephele/customdanglingpagerank/types/VertexWithRankAndDanglingSerializerFactory.java
+++ /dev/null
@@ -1,56 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.test.iterative.nephele.customdanglingpagerank.types;
-
-import org.apache.flink.api.common.typeutils.TypeSerializerFactory;
-import org.apache.flink.configuration.Configuration;
-
-
-public final class VertexWithRankAndDanglingSerializerFactory implements TypeSerializerFactory<VertexWithRankAndDangling> {
-
-	private static final VertexWithRankAndDanglingSerializer INSTANCE = new VertexWithRankAndDanglingSerializer();
-	
-	@Override
-	public void writeParametersToConfig(Configuration config) {}
-
-	@Override
-	public void readParametersFromConfig(Configuration config, ClassLoader cl) throws ClassNotFoundException {}
-
-	@Override
-	public VertexWithRankAndDanglingSerializer getSerializer() {
-		return INSTANCE;
-	}
-
-	@Override
-	public Class<VertexWithRankAndDangling> getDataType() {
-		return VertexWithRankAndDangling.class;
-	}
-	
-	// --------------------------------------------------------------------------------------------
-	
-	@Override
-	public int hashCode() {
-		return 1;
-	}
-	
-	@Override
-	public boolean equals(Object obj) {
-		return obj != null && obj.getClass() == VertexWithRankAndDanglingSerializerFactory.class;
-	}
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/a7a57ebe/flink-tests/src/test/java/org/apache/flink/test/iterative/nephele/customdanglingpagerank/types/VertexWithRankComparator.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/iterative/nephele/customdanglingpagerank/types/VertexWithRankComparator.java b/flink-tests/src/test/java/org/apache/flink/test/iterative/nephele/customdanglingpagerank/types/VertexWithRankComparator.java
deleted file mode 100644
index 9107f4b..0000000
--- a/flink-tests/src/test/java/org/apache/flink/test/iterative/nephele/customdanglingpagerank/types/VertexWithRankComparator.java
+++ /dev/null
@@ -1,151 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.test.iterative.nephele.customdanglingpagerank.types;
-
-import java.io.IOException;
-
-import org.apache.flink.api.common.typeutils.TypeComparator;
-import org.apache.flink.api.common.typeutils.base.LongComparator;
-import org.apache.flink.core.memory.DataInputView;
-import org.apache.flink.core.memory.DataOutputView;
-import org.apache.flink.core.memory.MemorySegment;
-
-public final class VertexWithRankComparator extends TypeComparator<VertexWithRank> {
-
-	private static final long serialVersionUID = 1L;
-	
-	private long reference;
-
-	@SuppressWarnings("rawtypes")
-	private TypeComparator[] comparators = new TypeComparator[]{new LongComparator(true)};
-
-	@Override
-	public int hash(VertexWithRank record) {
-		final long value = record.getVertexID();
-		return 43 + (int) (value ^ value >>> 32);
-	}
-
-	@Override
-	public void setReference(VertexWithRank toCompare) {
-		this.reference = toCompare.getVertexID();
-	}
-
-	@Override
-	public boolean equalToReference(VertexWithRank candidate) {
-		return candidate.getVertexID() == this.reference;
-	}
-
-	@Override
-	public int compareToReference(TypeComparator<VertexWithRank> referencedComparator) {
-		VertexWithRankComparator comp = (VertexWithRankComparator) referencedComparator;
-		final long diff = comp.reference - this.reference;
-		return diff < 0 ? -1 : diff > 0 ? 1 : 0;
-	}
-	
-	@Override
-	public int compare(VertexWithRank first, VertexWithRank second) {
-		final long diff = first.getVertexID() - second.getVertexID();
-		return diff < 0 ? -1 : diff > 0 ? 1 : 0;
-	}
-
-	@Override
-	public int compareSerialized(DataInputView source1, DataInputView source2) throws IOException {
-		final long diff = source1.readLong() - source2.readLong();
-		return diff < 0 ? -1 : diff > 0 ? 1 : 0;
-	}
-
-	@Override
-	public boolean supportsNormalizedKey() {
-		return true;
-	}
-
-	@Override
-	public int getNormalizeKeyLen() {
-		return 8;
-	}
-
-	@Override
-	public boolean isNormalizedKeyPrefixOnly(int keyBytes) {
-		return keyBytes < 8;
-	}
-
-	@Override
-	public void putNormalizedKey(VertexWithRank record, MemorySegment target, int offset, int len) {
-		final long value = record.getVertexID() - Long.MIN_VALUE;
-		
-		// see IntValue for an explanation of the logic
-		if (len == 8) {
-			// default case, full normalized key
-			target.putLongBigEndian(offset, value);
-		}
-		else if (len <= 0) {
-		}
-		else if (len < 8) {
-			for (int i = 0; len > 0; len--, i++) {
-				target.put(offset + i, (byte) ((value >>> ((3-i)<<3)) & 0xff));
-			}
-		}
-		else {
-			target.putLongBigEndian(offset, value);
-			for (int i = 8; i < len; i++) {
-				target.put(offset + i, (byte) 0);
-			}
-		}
-	}
-
-	@Override
-	public boolean invertNormalizedKey() {
-		return false;
-	}
-	
-	@Override
-	public boolean supportsSerializationWithKeyNormalization() {
-		return true;
-	}
-
-	@Override
-	public void writeWithKeyNormalization(VertexWithRank record, DataOutputView target) throws IOException {
-		target.writeLong(record.getVertexID() - Long.MIN_VALUE);
-		target.writeDouble(record.getRank());
-	}
-
-	@Override
-	public VertexWithRank readWithKeyDenormalization(VertexWithRank reuse, DataInputView source) throws IOException {
-		reuse.setVertexID(source.readLong() + Long.MIN_VALUE);
-		reuse.setRank(source.readDouble());
-		return reuse;
-	}
-
-	@Override
-	public VertexWithRankComparator duplicate() {
-		return new VertexWithRankComparator();
-	}
-
-	@Override
-	public int extractKeys(Object record, Object[] target, int index) {
-		target[index] = ((VertexWithRank) record).getVertexID();
-		return 1;
-	}
-
-	@Override
-	@SuppressWarnings("rawtypes")
-	public TypeComparator[] getFlatComparators() {
-		return comparators;
-	}
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/a7a57ebe/flink-tests/src/test/java/org/apache/flink/test/iterative/nephele/customdanglingpagerank/types/VertexWithRankComparatorFactory.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/iterative/nephele/customdanglingpagerank/types/VertexWithRankComparatorFactory.java b/flink-tests/src/test/java/org/apache/flink/test/iterative/nephele/customdanglingpagerank/types/VertexWithRankComparatorFactory.java
deleted file mode 100644
index 33ddb82..0000000
--- a/flink-tests/src/test/java/org/apache/flink/test/iterative/nephele/customdanglingpagerank/types/VertexWithRankComparatorFactory.java
+++ /dev/null
@@ -1,39 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.test.iterative.nephele.customdanglingpagerank.types;
-
-import org.apache.flink.api.common.typeutils.TypeComparatorFactory;
-import org.apache.flink.configuration.Configuration;
-
-/**
- *
- */
-public final class VertexWithRankComparatorFactory implements TypeComparatorFactory<VertexWithRank> {
-	
-	@Override
-	public void writeParametersToConfig(Configuration config) {}
-
-	@Override
-	public void readParametersFromConfig(Configuration config, ClassLoader cl) throws ClassNotFoundException {}
-
-	@Override
-	public VertexWithRankComparator createComparator() {
-		return new VertexWithRankComparator();
-	}
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/a7a57ebe/flink-tests/src/test/java/org/apache/flink/test/iterative/nephele/customdanglingpagerank/types/VertexWithRankDanglingToVertexWithAdjacencyListPairComparatorFactory.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/iterative/nephele/customdanglingpagerank/types/VertexWithRankDanglingToVertexWithAdjacencyListPairComparatorFactory.java b/flink-tests/src/test/java/org/apache/flink/test/iterative/nephele/customdanglingpagerank/types/VertexWithRankDanglingToVertexWithAdjacencyListPairComparatorFactory.java
deleted file mode 100644
index 802fa16..0000000
--- a/flink-tests/src/test/java/org/apache/flink/test/iterative/nephele/customdanglingpagerank/types/VertexWithRankDanglingToVertexWithAdjacencyListPairComparatorFactory.java
+++ /dev/null
@@ -1,91 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.test.iterative.nephele.customdanglingpagerank.types;
-
-import org.apache.flink.api.common.typeutils.TypeComparator;
-import org.apache.flink.api.common.typeutils.TypePairComparator;
-import org.apache.flink.api.common.typeutils.TypePairComparatorFactory;
-
-
-/**
- *
- */
-public class VertexWithRankDanglingToVertexWithAdjacencyListPairComparatorFactory
-	implements TypePairComparatorFactory<VertexWithRankAndDangling, VertexWithAdjacencyList>
-{
-	
-	@Override
-	public VertexWithRankDanglingToVertexWithAdjacencyListPairComparator createComparator12(
-			TypeComparator<VertexWithRankAndDangling> comparator1, TypeComparator<VertexWithAdjacencyList> comparator2)
-	{
-		return new VertexWithRankDanglingToVertexWithAdjacencyListPairComparator();
-	}
-
-	@Override
-	public VertexWithAdjacencyListToVertexWithRankDanglingPairComparator createComparator21(
-			TypeComparator<VertexWithRankAndDangling> comparator1, TypeComparator<VertexWithAdjacencyList> comparator2)
-	{
-		return new VertexWithAdjacencyListToVertexWithRankDanglingPairComparator();
-	}
-	
-
-	public static final class VertexWithRankDanglingToVertexWithAdjacencyListPairComparator
-		extends TypePairComparator<VertexWithRankAndDangling, VertexWithAdjacencyList>
-	{
-		private long reference;
-		
-		@Override
-		public void setReference(VertexWithRankAndDangling reference) {
-			this.reference = reference.getVertexID();
-		}
-		
-		@Override
-		public boolean equalToReference(VertexWithAdjacencyList candidate) {
-			return this.reference == candidate.getVertexID();
-		}
-	
-		@Override
-		public int compareToReference(VertexWithAdjacencyList candidate) {
-			long diff = candidate.getVertexID() - this.reference;
-			return diff < 0 ? -1 : diff > 0 ? 1 : 0;
-		}
-	}
-	
-	public static final class VertexWithAdjacencyListToVertexWithRankDanglingPairComparator
-		extends TypePairComparator<VertexWithAdjacencyList, VertexWithRankAndDangling>
-	{
-		private long reference;
-		
-		@Override
-		public void setReference(VertexWithAdjacencyList reference) {
-			this.reference = reference.getVertexID();
-		}
-		
-		@Override
-		public boolean equalToReference(VertexWithRankAndDangling candidate) {
-			return this.reference == candidate.getVertexID();
-		}
-	
-		@Override
-		public int compareToReference(VertexWithRankAndDangling candidate) {
-			long diff = candidate.getVertexID() - this.reference;
-			return diff < 0 ? -1 : diff > 0 ? 1 : 0;
-		}
-	}
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/a7a57ebe/flink-tests/src/test/java/org/apache/flink/test/iterative/nephele/customdanglingpagerank/types/VertexWithRankDanglingToVertexWithRankPairComparatorFactory.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/iterative/nephele/customdanglingpagerank/types/VertexWithRankDanglingToVertexWithRankPairComparatorFactory.java b/flink-tests/src/test/java/org/apache/flink/test/iterative/nephele/customdanglingpagerank/types/VertexWithRankDanglingToVertexWithRankPairComparatorFactory.java
deleted file mode 100644
index 674d85a..0000000
--- a/flink-tests/src/test/java/org/apache/flink/test/iterative/nephele/customdanglingpagerank/types/VertexWithRankDanglingToVertexWithRankPairComparatorFactory.java
+++ /dev/null
@@ -1,91 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.test.iterative.nephele.customdanglingpagerank.types;
-
-import org.apache.flink.api.common.typeutils.TypeComparator;
-import org.apache.flink.api.common.typeutils.TypePairComparator;
-import org.apache.flink.api.common.typeutils.TypePairComparatorFactory;
-
-
-/**
- *
- */
-public class VertexWithRankDanglingToVertexWithRankPairComparatorFactory
-	implements TypePairComparatorFactory<VertexWithRankAndDangling, VertexWithRank>
-{
-	
-	@Override
-	public VertexWithRankDanglingToVertexWithRankComparator createComparator12(
-			TypeComparator<VertexWithRankAndDangling> comparator1, TypeComparator<VertexWithRank> comparator2)
-	{
-		return new VertexWithRankDanglingToVertexWithRankComparator();
-	}
-
-	@Override
-	public VertexWithRankToVertexWithRankDanglingPairComparator createComparator21(
-			TypeComparator<VertexWithRankAndDangling> comparator1, TypeComparator<VertexWithRank> comparator2)
-	{
-		return new VertexWithRankToVertexWithRankDanglingPairComparator();
-	}
-	
-
-	public static final class VertexWithRankDanglingToVertexWithRankComparator
-		extends TypePairComparator<VertexWithRankAndDangling, VertexWithRank>
-	{
-		private long reference;
-		
-		@Override
-		public void setReference(VertexWithRankAndDangling reference) {
-			this.reference = reference.getVertexID();
-		}
-		
-		@Override
-		public boolean equalToReference(VertexWithRank candidate) {
-			return this.reference == candidate.getVertexID();
-		}
-	
-		@Override
-		public int compareToReference(VertexWithRank candidate) {
-			long diff = candidate.getVertexID() - this.reference;
-			return diff < 0 ? -1 : diff > 0 ? 1 : 0;
-		}
-	}
-	
-	public static final class VertexWithRankToVertexWithRankDanglingPairComparator
-		extends TypePairComparator<VertexWithRank, VertexWithRankAndDangling>
-	{
-		private long reference;
-		
-		@Override
-		public void setReference(VertexWithRank reference) {
-			this.reference = reference.getVertexID();
-		}
-		
-		@Override
-		public boolean equalToReference(VertexWithRankAndDangling candidate) {
-			return this.reference == candidate.getVertexID();
-		}
-	
-		@Override
-		public int compareToReference(VertexWithRankAndDangling candidate) {
-			long diff = candidate.getVertexID() - this.reference;
-			return diff < 0 ? -1 : diff > 0 ? 1 : 0;
-		}
-	}
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/a7a57ebe/flink-tests/src/test/java/org/apache/flink/test/iterative/nephele/customdanglingpagerank/types/VertexWithRankSerializer.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/iterative/nephele/customdanglingpagerank/types/VertexWithRankSerializer.java b/flink-tests/src/test/java/org/apache/flink/test/iterative/nephele/customdanglingpagerank/types/VertexWithRankSerializer.java
deleted file mode 100644
index 2c3abcd..0000000
--- a/flink-tests/src/test/java/org/apache/flink/test/iterative/nephele/customdanglingpagerank/types/VertexWithRankSerializer.java
+++ /dev/null
@@ -1,81 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.test.iterative.nephele.customdanglingpagerank.types;
-
-import java.io.IOException;
-
-import org.apache.flink.api.common.typeutils.base.TypeSerializerSingleton;
-import org.apache.flink.core.memory.DataInputView;
-import org.apache.flink.core.memory.DataOutputView;
-
-public final class VertexWithRankSerializer extends TypeSerializerSingleton<VertexWithRank> {
-
-	private static final long serialVersionUID = 1L;
-	
-	
-	@Override
-	public boolean isImmutableType() {
-		return false;
-	}
-
-	@Override
-	public VertexWithRank createInstance() {
-		return new VertexWithRank();
-	}
-	
-	@Override
-	public VertexWithRank copy(VertexWithRank from) {
-		return new VertexWithRank(from.getVertexID(), from.getRank());
-	}
-
-	@Override
-	public VertexWithRank copy(VertexWithRank from, VertexWithRank reuse) {
-		reuse.setVertexID(from.getVertexID());
-		reuse.setRank(from.getRank());
-		return reuse;
-	}
-
-	@Override
-	public int getLength() {
-		return 16;
-	}
-
-	@Override
-	public void serialize(VertexWithRank record, DataOutputView target) throws IOException {
-		target.writeLong(record.getVertexID());
-		target.writeDouble(record.getRank());
-	}
-
-	@Override
-	public VertexWithRank deserialize(DataInputView source) throws IOException {
-		return new VertexWithRank(source.readLong(), source.readDouble());
-	}
-	
-	@Override
-	public VertexWithRank deserialize(VertexWithRank target, DataInputView source) throws IOException {
-		target.setVertexID(source.readLong());
-		target.setRank(source.readDouble());
-		return target;
-	}
-
-	@Override
-	public void copy(DataInputView source, DataOutputView target) throws IOException {
-		target.write(source, 16);
-	}
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/a7a57ebe/flink-tests/src/test/java/org/apache/flink/test/iterative/nephele/customdanglingpagerank/types/VertexWithRankSerializerFactory.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/iterative/nephele/customdanglingpagerank/types/VertexWithRankSerializerFactory.java b/flink-tests/src/test/java/org/apache/flink/test/iterative/nephele/customdanglingpagerank/types/VertexWithRankSerializerFactory.java
deleted file mode 100644
index 67ce028..0000000
--- a/flink-tests/src/test/java/org/apache/flink/test/iterative/nephele/customdanglingpagerank/types/VertexWithRankSerializerFactory.java
+++ /dev/null
@@ -1,56 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.test.iterative.nephele.customdanglingpagerank.types;
-
-import org.apache.flink.api.common.typeutils.TypeSerializerFactory;
-import org.apache.flink.configuration.Configuration;
-
-
-public final class VertexWithRankSerializerFactory implements TypeSerializerFactory<VertexWithRank> {
-
-	private static final VertexWithRankSerializer INSTANCE = new VertexWithRankSerializer();
-	
-	@Override
-	public void writeParametersToConfig(Configuration config) {}
-
-	@Override
-	public void readParametersFromConfig(Configuration config, ClassLoader cl) throws ClassNotFoundException {}
-
-	@Override
-	public VertexWithRankSerializer getSerializer() {
-		return INSTANCE;
-	}
-
-	@Override
-	public Class<VertexWithRank> getDataType() {
-		return VertexWithRank.class;
-	}
-	
-	// --------------------------------------------------------------------------------------------
-	
-	@Override
-	public int hashCode() {
-		return 1;
-	}
-	
-	@Override
-	public boolean equals(Object obj) {
-		return obj != null && obj.getClass() == VertexWithRankSerializerFactory.class;
-	}
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/a7a57ebe/flink-tests/src/test/java/org/apache/flink/test/iterative/nephele/danglingpagerank/AsciiLongArrayView.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/iterative/nephele/danglingpagerank/AsciiLongArrayView.java b/flink-tests/src/test/java/org/apache/flink/test/iterative/nephele/danglingpagerank/AsciiLongArrayView.java
deleted file mode 100644
index 05b3f9b..0000000
--- a/flink-tests/src/test/java/org/apache/flink/test/iterative/nephele/danglingpagerank/AsciiLongArrayView.java
+++ /dev/null
@@ -1,166 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-
-package org.apache.flink.test.iterative.nephele.danglingpagerank;
-
-import java.io.Serializable;
-
-import com.google.common.base.Charsets;
-
-public class AsciiLongArrayView implements Serializable {
-  private static final long serialVersionUID = 1L;
-
-  private byte[] buffer;
-
-  private int offset;
-  private int numBytes;
-
-  private int tokenOffset;
-  private int tokenNumBytes;
-
-  private static final int NOT_SET = -1;
-
-  private static final int RADIX_TEN = 10;
-  private static final long MULTMIN_RADIX_TEN = Long.MIN_VALUE / 10;
-  private static final long N_MULTMAX_RADIX_TEN = -Long.MAX_VALUE / 10;
-
-  public void set(byte[] buffer, int offset, int numBytes) {
-    this.buffer = buffer;
-    this.offset = offset;
-    this.numBytes = numBytes;
-
-    this.tokenOffset = NOT_SET;
-    checkForSingleTrailingWhitespace();
-  }
-
-  private void checkForSingleTrailingWhitespace() {
-    if (Character.isWhitespace((char) buffer[offset + numBytes -1])) {
-      numBytes--;
-    }
-  }
-
-  public int numElements() {
-    int matches = 0;
-    int pos = offset;
-    while (pos < offset + numBytes) {
-      if (Character.isWhitespace((char) buffer[pos])) {
-        matches++;
-      }
-      pos++;
-    }
-    return matches + 1;
-  }
-
-  public boolean next() {
-
-    if (tokenOffset == NOT_SET) {
-      tokenOffset = offset;
-    } else {
-      tokenOffset += tokenNumBytes + 1;
-      if (tokenOffset > offset + numBytes) {
-        return false;
-      }
-    }
-
-    tokenNumBytes = 1;
-    while (true) {
-      int candidatePos = tokenOffset + tokenNumBytes;
-      if (candidatePos >= offset + numBytes || Character.isWhitespace((char) buffer[candidatePos])) {
-        break;
-      }
-      tokenNumBytes++;
-    }
-
-    return true;
-  }
-
-  private char tokenCharAt(int pos) {
-    return (char) buffer[tokenOffset + pos];
-  }
-
-  public long element() {
-
-    long result = 0;
-    boolean negative = false;
-    int i = 0, max = tokenNumBytes;
-    long limit;
-    long multmin;
-    int digit;
-
-    if (max > 0) {
-      if (tokenCharAt(0) == '-') {
-        negative = true;
-        limit = Long.MIN_VALUE;
-        i++;
-      } else {
-        limit = -Long.MAX_VALUE;
-      }
-
-      multmin = negative ? MULTMIN_RADIX_TEN : N_MULTMAX_RADIX_TEN;
-
-      if (i < max) {
-        digit = Character.digit(tokenCharAt(i++), RADIX_TEN);
-        if (digit < 0) {
-          throw new NumberFormatException(toString());
-        } else {
-          result = -digit;
-        }
-      }
-      while (i < max) {
-        // Accumulating negatively avoids surprises near MAX_VALUE
-        digit = Character.digit(tokenCharAt(i++), RADIX_TEN);
-        if (digit < 0) {
-          throw new NumberFormatException(toString());
-        }
-        if (result < multmin) {
-          throw new NumberFormatException(toString());
-        }
-        result *= RADIX_TEN;
-        if (result < limit + digit) {
-          throw new NumberFormatException(toString());
-        }
-        result -= digit;
-      }
-    } else {
-      throw new NumberFormatException(toString());
-    }
-    if (negative) {
-      if (i > 1) {
-        return result;
-      } else { /* Only got "-" */
-        throw new NumberFormatException(toString());
-      }
-    } else {
-      return -result;
-    }
-  }
-
-//  public double elementAsDouble() {
-//    String token = new String(buffer, tokenOffset, tokenNumBytes, Charsets.US_ASCII);
-//    return Double.valueOf(token);
-//  }
-
-
-  @Override
-  public String toString() {
-    return "[" + new String(buffer, offset, numBytes, Charsets.US_ASCII) + "] (buffer length: " + buffer.length +
-        ", offset: " + offset + ", numBytes: " + numBytes + ")";
-  }
-
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/a7a57ebe/flink-tests/src/test/java/org/apache/flink/test/iterative/nephele/danglingpagerank/BooleanValue.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/iterative/nephele/danglingpagerank/BooleanValue.java b/flink-tests/src/test/java/org/apache/flink/test/iterative/nephele/danglingpagerank/BooleanValue.java
deleted file mode 100644
index af04557..0000000
--- a/flink-tests/src/test/java/org/apache/flink/test/iterative/nephele/danglingpagerank/BooleanValue.java
+++ /dev/null
@@ -1,57 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-
-package org.apache.flink.test.iterative.nephele.danglingpagerank;
-
-import java.io.IOException;
-
-import org.apache.flink.core.memory.DataInputView;
-import org.apache.flink.core.memory.DataOutputView;
-import org.apache.flink.types.Value;
-
-public class BooleanValue implements Value {
-  private static final long serialVersionUID = 1L;
-
-  private boolean value;
-
-  public BooleanValue(boolean value) {
-    this.value = value;
-  }
-
-  public BooleanValue() {
-  }
-
-  public boolean get() {
-    return value;
-  }
-
-  public void set(boolean value) {
-    this.value = value;
-  }
-
-  @Override
-  public void write(DataOutputView out) throws IOException {
-    out.writeBoolean(value);
-  }
-
-  @Override
-  public void read(DataInputView in) throws IOException {
-    value = in.readBoolean();
-  }
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/a7a57ebe/flink-tests/src/test/java/org/apache/flink/test/iterative/nephele/danglingpagerank/CompensatableDanglingPageRank.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/iterative/nephele/danglingpagerank/CompensatableDanglingPageRank.java b/flink-tests/src/test/java/org/apache/flink/test/iterative/nephele/danglingpagerank/CompensatableDanglingPageRank.java
deleted file mode 100644
index 78038b3..0000000
--- a/flink-tests/src/test/java/org/apache/flink/test/iterative/nephele/danglingpagerank/CompensatableDanglingPageRank.java
+++ /dev/null
@@ -1,295 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-
-package org.apache.flink.test.iterative.nephele.danglingpagerank;
-
-import org.apache.flink.api.common.operators.util.UserCodeClassWrapper;
-import org.apache.flink.api.common.typeutils.TypeComparatorFactory;
-import org.apache.flink.api.common.typeutils.TypePairComparatorFactory;
-import org.apache.flink.api.common.typeutils.TypeSerializerFactory;
-import org.apache.flink.api.common.typeutils.record.RecordComparatorFactory;
-import org.apache.flink.api.common.typeutils.record.RecordPairComparatorFactory;
-import org.apache.flink.api.common.typeutils.record.RecordSerializerFactory;
-import org.apache.flink.api.java.record.io.FileOutputFormat;
-import org.apache.flink.configuration.Configuration;
-import org.apache.flink.runtime.iterative.task.IterationHeadPactTask;
-import org.apache.flink.runtime.iterative.task.IterationIntermediatePactTask;
-import org.apache.flink.runtime.iterative.task.IterationTailPactTask;
-import org.apache.flink.runtime.jobgraph.JobVertex;
-import org.apache.flink.runtime.jobgraph.DistributionPattern;
-import org.apache.flink.runtime.jobgraph.InputFormatVertex;
-import org.apache.flink.runtime.jobgraph.JobGraph;
-import org.apache.flink.runtime.jobgraph.OutputFormatVertex;
-import org.apache.flink.runtime.jobmanager.scheduler.SlotSharingGroup;
-import org.apache.flink.runtime.operators.BuildSecondCachedJoinDriver;
-import org.apache.flink.runtime.operators.CoGroupDriver;
-import org.apache.flink.runtime.operators.CollectorMapDriver;
-import org.apache.flink.runtime.operators.DriverStrategy;
-import org.apache.flink.runtime.operators.shipping.ShipStrategyType;
-import org.apache.flink.runtime.operators.util.LocalStrategy;
-import org.apache.flink.runtime.operators.util.TaskConfig;
-import org.apache.flink.test.iterative.nephele.JobGraphUtils;
-import org.apache.flink.types.LongValue;
-import org.apache.flink.util.OperatingSystem;
-
-public class CompensatableDanglingPageRank {
-	
-	private static final TypeSerializerFactory<?> recSerializer = RecordSerializerFactory.get();
-	
-	@SuppressWarnings("unchecked")
-	private static final TypeComparatorFactory<?> fieldZeroComparator = new RecordComparatorFactory(new int[] {0}, new Class[] {LongValue.class}, new boolean[] {true});
-	
-	private static final TypePairComparatorFactory<?, ?> pairComparatorFactory = new RecordPairComparatorFactory();
-	
-	
-	private static final int NUM_FILE_HANDLES_PER_SORT = 64;
-	
-	private static final float SORT_SPILL_THRESHOLD = 0.85f;
-	
-	private static final int ITERATION_ID = 1;
-	
-
-//	public static void main(String[] args) throws Exception {
-//		String confPath = args.length >= 6 ? confPath = args[5] : PlayConstants.PLAY_DIR + "local-conf";
-//		
-//		GlobalConfiguration.loadConfiguration(confPath);
-//		Configuration conf = GlobalConfiguration.getConfiguration();
-//		
-//		JobGraph jobGraph = getJobGraph(args);
-//		JobGraphUtils.submit(jobGraph, conf);
-//	}
-		
-	public static JobGraph getJobGraph(String[] args) throws Exception {
-
-		int parallelism = 2;
-		String pageWithRankInputPath = ""; // "file://" + PlayConstants.PLAY_DIR + "test-inputs/danglingpagerank/pageWithRank";
-		String adjacencyListInputPath = ""; // "file://" + PlayConstants.PLAY_DIR +
-//			"test-inputs/danglingpagerank/adjacencylists";
-		String outputPath = OperatingSystem.isWindows() ? "file:/c:/tmp/flink/iterations" : "file:///tmp/flink/iterations";
-//		String confPath = PlayConstants.PLAY_DIR + "local-conf";
-		int minorConsumer = 25;
-		int matchMemory = 50;
-		int coGroupSortMemory = 50;
-		int numIterations = 25;
-		long numVertices = 5;
-		long numDanglingVertices = 1;
-
-		String failingWorkers = "1";
-		int failingIteration = 2;
-		double messageLoss = 0.75;
-
-		if (args.length >= 15) {
-			parallelism = Integer.parseInt(args[0]);
-			pageWithRankInputPath = args[1];
-			adjacencyListInputPath = args[2];
-			outputPath = args[3];
-//			confPath = args[4];
-			minorConsumer = Integer.parseInt(args[5]);
-			matchMemory = Integer.parseInt(args[6]);
-			coGroupSortMemory = Integer.parseInt(args[7]);
-			numIterations = Integer.parseInt(args[8]);
-			numVertices = Long.parseLong(args[9]);
-			numDanglingVertices = Long.parseLong(args[10]);
-			failingWorkers = args[11];
-			failingIteration = Integer.parseInt(args[12]);
-			messageLoss = Double.parseDouble(args[13]);
-		}
-
-		int totalMemoryConsumption = 3*minorConsumer + matchMemory + coGroupSortMemory;
-
-		JobGraph jobGraph = new JobGraph("CompensatableDanglingPageRank");
-		
-		// --------------- the inputs ---------------------
-
-		// page rank input
-		InputFormatVertex pageWithRankInput = JobGraphUtils.createInput(new ImprovedDanglingPageRankInputFormat(),
-			pageWithRankInputPath, "DanglingPageWithRankInput", jobGraph, parallelism);
-		TaskConfig pageWithRankInputConfig = new TaskConfig(pageWithRankInput.getConfiguration());
-		pageWithRankInputConfig.addOutputShipStrategy(ShipStrategyType.PARTITION_HASH);
-		pageWithRankInputConfig.setOutputComparator(fieldZeroComparator, 0);
-		pageWithRankInputConfig.setOutputSerializer(recSerializer);
-		pageWithRankInputConfig.setStubParameter("pageRank.numVertices", String.valueOf(numVertices));
-
-		// edges as adjacency list
-		InputFormatVertex adjacencyListInput = JobGraphUtils.createInput(new ImprovedAdjacencyListInputFormat(),
-			adjacencyListInputPath, "AdjancencyListInput", jobGraph, parallelism);
-		TaskConfig adjacencyListInputConfig = new TaskConfig(adjacencyListInput.getConfiguration());
-		adjacencyListInputConfig.addOutputShipStrategy(ShipStrategyType.PARTITION_HASH);
-		adjacencyListInputConfig.setOutputSerializer(recSerializer);
-		adjacencyListInputConfig.setOutputComparator(fieldZeroComparator, 0);
-
-		// --------------- the head ---------------------
-		JobVertex head = JobGraphUtils.createTask(IterationHeadPactTask.class, "IterationHead", jobGraph, parallelism);
-		TaskConfig headConfig = new TaskConfig(head.getConfiguration());
-		headConfig.setIterationId(ITERATION_ID);
-		
-		// initial input / partial solution
-		headConfig.addInputToGroup(0);
-		headConfig.setIterationHeadPartialSolutionOrWorksetInputIndex(0);
-		headConfig.setInputSerializer(recSerializer, 0);
-		headConfig.setInputComparator(fieldZeroComparator, 0);
-		headConfig.setInputLocalStrategy(0, LocalStrategy.SORT);
-		headConfig.setRelativeMemoryInput(0, (double)minorConsumer/totalMemoryConsumption);
-		headConfig.setFilehandlesInput(0, NUM_FILE_HANDLES_PER_SORT);
-		headConfig.setSpillingThresholdInput(0, SORT_SPILL_THRESHOLD);
-		
-		// back channel / iterations
-		headConfig.setRelativeBackChannelMemory((double)minorConsumer/totalMemoryConsumption);
-		
-		// output into iteration
-		headConfig.setOutputSerializer(recSerializer);
-		headConfig.addOutputShipStrategy(ShipStrategyType.FORWARD);
-		headConfig.addOutputShipStrategy(ShipStrategyType.FORWARD);
-		
-		// final output
-		TaskConfig headFinalOutConfig = new TaskConfig(new Configuration());
-		headFinalOutConfig.setOutputSerializer(recSerializer);
-		headFinalOutConfig.addOutputShipStrategy(ShipStrategyType.FORWARD);
-		headConfig.setIterationHeadFinalOutputConfig(headFinalOutConfig);
-		
-		// the sync
-		headConfig.setIterationHeadIndexOfSyncOutput(3);
-		headConfig.setNumberOfIterations(numIterations);
-		
-		// the driver 
-		headConfig.setDriver(CollectorMapDriver.class);
-		headConfig.setDriverStrategy(DriverStrategy.COLLECTOR_MAP);
-		headConfig.setStubWrapper(new UserCodeClassWrapper<CompensatingMap>(CompensatingMap.class));
-		headConfig.setStubParameter("pageRank.numVertices", String.valueOf(numVertices));
-		headConfig.setStubParameter("compensation.failingWorker", failingWorkers);
-		headConfig.setStubParameter("compensation.failingIteration", String.valueOf(failingIteration));
-		headConfig.setStubParameter("compensation.messageLoss", String.valueOf(messageLoss));
-		headConfig.addIterationAggregator(CompensatableDotProductCoGroup.AGGREGATOR_NAME, new PageRankStatsAggregator());
-
-		// --------------- the join ---------------------
-		
-		JobVertex intermediate = JobGraphUtils.createTask(IterationIntermediatePactTask.class, "IterationIntermediate", jobGraph, parallelism);
-		TaskConfig intermediateConfig = new TaskConfig(intermediate.getConfiguration());
-		intermediateConfig.setIterationId(ITERATION_ID);
-//		intermediateConfig.setDriver(RepeatableHashjoinMatchDriverWithCachedBuildside.class);
-		intermediateConfig.setDriver(BuildSecondCachedJoinDriver.class);
-		intermediateConfig.setDriverStrategy(DriverStrategy.HYBRIDHASH_BUILD_SECOND);
-		intermediateConfig.setRelativeMemoryDriver((double)matchMemory/totalMemoryConsumption);
-		intermediateConfig.addInputToGroup(0);
-		intermediateConfig.addInputToGroup(1);
-		intermediateConfig.setInputSerializer(recSerializer, 0);
-		intermediateConfig.setInputSerializer(recSerializer, 1);
-		intermediateConfig.setDriverComparator(fieldZeroComparator, 0);
-		intermediateConfig.setDriverComparator(fieldZeroComparator, 1);
-		intermediateConfig.setDriverPairComparator(pairComparatorFactory);
-		
-		intermediateConfig.setOutputSerializer(recSerializer);
-		intermediateConfig.addOutputShipStrategy(ShipStrategyType.PARTITION_HASH);
-		intermediateConfig.setOutputComparator(fieldZeroComparator, 0);
-		
-		intermediateConfig.setStubWrapper(new UserCodeClassWrapper<CompensatableDotProductMatch>(CompensatableDotProductMatch.class));
-		intermediateConfig.setStubParameter("pageRank.numVertices", String.valueOf(numVertices));
-		intermediateConfig.setStubParameter("compensation.failingWorker", failingWorkers);
-		intermediateConfig.setStubParameter("compensation.failingIteration", String.valueOf(failingIteration));
-		intermediateConfig.setStubParameter("compensation.messageLoss", String.valueOf(messageLoss));
-
-		// ---------------- the tail (co group) --------------------
-		
-		JobVertex tail = JobGraphUtils.createTask(IterationTailPactTask.class, "IterationTail", jobGraph,
-			parallelism);
-		TaskConfig tailConfig = new TaskConfig(tail.getConfiguration());
-		tailConfig.setIterationId(ITERATION_ID);
-		tailConfig.setIsWorksetUpdate();
-		// TODO we need to combine!
-		
-		// inputs and driver
-		tailConfig.setDriver(CoGroupDriver.class);
-		tailConfig.setDriverStrategy(DriverStrategy.CO_GROUP);
-		tailConfig.addInputToGroup(0);
-		tailConfig.addInputToGroup(1);
-		tailConfig.setInputSerializer(recSerializer, 0);
-		tailConfig.setInputSerializer(recSerializer, 1);
-		tailConfig.setDriverComparator(fieldZeroComparator, 0);
-		tailConfig.setDriverComparator(fieldZeroComparator, 1);
-		tailConfig.setDriverPairComparator(pairComparatorFactory);
-		tailConfig.setInputAsynchronouslyMaterialized(0, true);
-		tailConfig.setRelativeInputMaterializationMemory(0, (double)minorConsumer/totalMemoryConsumption);
-		tailConfig.setInputLocalStrategy(1, LocalStrategy.SORT);
-		tailConfig.setInputComparator(fieldZeroComparator, 1);
-		tailConfig.setRelativeMemoryInput(1, (double)coGroupSortMemory/totalMemoryConsumption);
-		tailConfig.setFilehandlesInput(1, NUM_FILE_HANDLES_PER_SORT);
-		tailConfig.setSpillingThresholdInput(1, SORT_SPILL_THRESHOLD);
-		
-		// output
-		tailConfig.setOutputSerializer(recSerializer);
-		
-		// the stub
-		tailConfig.setStubWrapper(new UserCodeClassWrapper<CompensatableDotProductCoGroup>(CompensatableDotProductCoGroup.class));
-		tailConfig.setStubParameter("pageRank.numVertices", String.valueOf(numVertices));
-		tailConfig.setStubParameter("pageRank.numDanglingVertices", String.valueOf(numDanglingVertices));
-		tailConfig.setStubParameter("compensation.failingWorker", failingWorkers);
-		tailConfig.setStubParameter("compensation.failingIteration", String.valueOf(failingIteration));
-		tailConfig.setStubParameter("compensation.messageLoss", String.valueOf(messageLoss));
-		
-		// --------------- the output ---------------------
-
-		OutputFormatVertex output = JobGraphUtils.createFileOutput(jobGraph, "FinalOutput", parallelism);
-		TaskConfig outputConfig = new TaskConfig(output.getConfiguration());
-		outputConfig.addInputToGroup(0);
-		outputConfig.setInputSerializer(recSerializer, 0);
-		outputConfig.setStubWrapper(new UserCodeClassWrapper<PageWithRankOutFormat>(PageWithRankOutFormat.class));
-		outputConfig.setStubParameter(FileOutputFormat.FILE_PARAMETER_KEY, outputPath);
-		
-		// --------------- the auxiliaries ---------------------
-
-		JobVertex sync = JobGraphUtils.createSync(jobGraph, parallelism);
-		TaskConfig syncConfig = new TaskConfig(sync.getConfiguration());
-		syncConfig.setNumberOfIterations(numIterations);
-		syncConfig.addIterationAggregator(CompensatableDotProductCoGroup.AGGREGATOR_NAME, new PageRankStatsAggregator());
-		syncConfig.setConvergenceCriterion(CompensatableDotProductCoGroup.AGGREGATOR_NAME, new DiffL1NormConvergenceCriterion());
-		syncConfig.setIterationId(ITERATION_ID);
-		
-		// --------------- the wiring ---------------------
-
-		JobGraphUtils.connect(pageWithRankInput, head, DistributionPattern.ALL_TO_ALL);
-
-		JobGraphUtils.connect(head, intermediate, DistributionPattern.POINTWISE);
-		intermediateConfig.setGateIterativeWithNumberOfEventsUntilInterrupt(0, 1);
-		
-		JobGraphUtils.connect(adjacencyListInput, intermediate, DistributionPattern.ALL_TO_ALL);
-		
-		JobGraphUtils.connect(head, tail, DistributionPattern.POINTWISE);
-		JobGraphUtils.connect(intermediate, tail, DistributionPattern.ALL_TO_ALL);
-		tailConfig.setGateIterativeWithNumberOfEventsUntilInterrupt(0, 1);
-		tailConfig.setGateIterativeWithNumberOfEventsUntilInterrupt(1, parallelism);
-
-		JobGraphUtils.connect(head, output, DistributionPattern.POINTWISE);
-
-		JobGraphUtils.connect(head, sync, DistributionPattern.POINTWISE);
-		
-		SlotSharingGroup sharingGroup = new SlotSharingGroup();
-		pageWithRankInput.setSlotSharingGroup(sharingGroup);
-		adjacencyListInput.setSlotSharingGroup(sharingGroup);
-		head.setSlotSharingGroup(sharingGroup);
-		intermediate.setSlotSharingGroup(sharingGroup);
-		tail.setSlotSharingGroup(sharingGroup);
-		output.setSlotSharingGroup(sharingGroup);
-		sync.setSlotSharingGroup(sharingGroup);
-		
-		tail.setStrictlyCoLocatedWith(head);
-		intermediate.setStrictlyCoLocatedWith(head);
-
-		return jobGraph;
-	}
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/a7a57ebe/flink-tests/src/test/java/org/apache/flink/test/iterative/nephele/danglingpagerank/CompensatableDotProductCoGroup.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/iterative/nephele/danglingpagerank/CompensatableDotProductCoGroup.java b/flink-tests/src/test/java/org/apache/flink/test/iterative/nephele/danglingpagerank/CompensatableDotProductCoGroup.java
deleted file mode 100644
index bd1eeb9..0000000
--- a/flink-tests/src/test/java/org/apache/flink/test/iterative/nephele/danglingpagerank/CompensatableDotProductCoGroup.java
+++ /dev/null
@@ -1,137 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.test.iterative.nephele.danglingpagerank;
-
-import java.util.Iterator;
-import java.util.Set;
-
-import org.apache.flink.api.java.record.functions.CoGroupFunction;
-import org.apache.flink.configuration.Configuration;
-import org.apache.flink.test.iterative.nephele.ConfigUtils;
-import org.apache.flink.types.DoubleValue;
-import org.apache.flink.types.LongValue;
-import org.apache.flink.types.Record;
-import org.apache.flink.util.Collector;
-
-@SuppressWarnings("deprecation")
-public class CompensatableDotProductCoGroup extends CoGroupFunction {
-	
-	private static final long serialVersionUID = 1L;
-	
-
-	public static final String AGGREGATOR_NAME = "pagerank.aggregator";
-
-	private Record accumulator = new Record();
-
-	private int workerIndex;
-
-	private int currentIteration;
-
-	private int failingIteration;
-
-	private Set<Integer> failingWorkers;
-
-	private PageRankStatsAggregator aggregator;
-
-	private long numVertices;
-
-	private long numDanglingVertices;
-
-	private double dampingFactor;
-
-	private double danglingRankFactor;
-
-	private static final double BETA = 0.85;
-
-	private final DoubleValue newRank = new DoubleValue();
-
-	private BooleanValue isDangling = new BooleanValue();
-
-	private LongValue vertexID = new LongValue();
-
-	private DoubleValue doubleInstance = new DoubleValue();
-
-	@Override
-	public void open(Configuration parameters) {
-		workerIndex = getRuntimeContext().getIndexOfThisSubtask();
-		currentIteration = getIterationRuntimeContext().getSuperstepNumber();
-		
-		failingIteration = ConfigUtils.asInteger("compensation.failingIteration", parameters);
-		failingWorkers = ConfigUtils.asIntSet("compensation.failingWorker", parameters);
-		numVertices = ConfigUtils.asLong("pageRank.numVertices", parameters);
-		numDanglingVertices = ConfigUtils.asLong("pageRank.numDanglingVertices", parameters);
-
-		dampingFactor = (1d - BETA) / (double) numVertices;
-		
-		aggregator = getIterationRuntimeContext().getIterationAggregator(AGGREGATOR_NAME);
-		
-		if (currentIteration == 1) {
-			danglingRankFactor = BETA * (double) numDanglingVertices / ((double) numVertices * (double) numVertices);
-		} else {
-			PageRankStats previousAggregate = getIterationRuntimeContext().getPreviousIterationAggregate(AGGREGATOR_NAME);
-			danglingRankFactor = BETA * previousAggregate.danglingRank() / (double) numVertices;
-		}
-	}
-
-	@Override
-	public void coGroup(Iterator<Record> currentPageRankIterator, Iterator<Record> partialRanks, Collector<Record> collector) {
-
-		if (!currentPageRankIterator.hasNext()) {
-			long missingVertex = partialRanks.next().getField(0, LongValue.class).getValue();
-			throw new IllegalStateException("No current page rank for vertex [" + missingVertex + "]!");
-		}
-
-		Record currentPageRank = currentPageRankIterator.next();
-
-		long edges = 0;
-		double summedRank = 0;
-		while (partialRanks.hasNext()) {
-			Record pr = partialRanks.next();
-			summedRank += pr.getField(1, doubleInstance).getValue();
-			edges++;
-		}
-
-		double rank = BETA * summedRank + dampingFactor + danglingRankFactor;
-
-		double currentRank = currentPageRank.getField(1, doubleInstance).getValue();
-		isDangling = currentPageRank.getField(2, isDangling);
-
-		double danglingRankToAggregate = isDangling.get() ? rank : 0;
-		long danglingVerticesToAggregate = isDangling.get() ? 1 : 0;
-
-		double diff = Math.abs(currentRank - rank);
-
-		aggregator.aggregate(diff, rank, danglingRankToAggregate, danglingVerticesToAggregate, 1, edges, summedRank, 0);
-
-		newRank.setValue(rank);
-
-		accumulator.setField(0, currentPageRank.getField(0, vertexID));
-		accumulator.setField(1, newRank);
-		accumulator.setField(2, isDangling);
-
-		collector.collect(accumulator);
-	}
-
-	@Override
-	public void close() throws Exception {
-		if (currentIteration == failingIteration && failingWorkers.contains(workerIndex)) {
-			aggregator.reset();
-		}
-	}
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/a7a57ebe/flink-tests/src/test/java/org/apache/flink/test/iterative/nephele/danglingpagerank/CompensatableDotProductMatch.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/iterative/nephele/danglingpagerank/CompensatableDotProductMatch.java b/flink-tests/src/test/java/org/apache/flink/test/iterative/nephele/danglingpagerank/CompensatableDotProductMatch.java
deleted file mode 100644
index 0508886..0000000
--- a/flink-tests/src/test/java/org/apache/flink/test/iterative/nephele/danglingpagerank/CompensatableDotProductMatch.java
+++ /dev/null
@@ -1,102 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-
-package org.apache.flink.test.iterative.nephele.danglingpagerank;
-
-import java.util.Random;
-import java.util.Set;
-
-import org.apache.flink.api.java.record.functions.JoinFunction;
-import org.apache.flink.configuration.Configuration;
-import org.apache.flink.test.iterative.nephele.ConfigUtils;
-import org.apache.flink.types.DoubleValue;
-import org.apache.flink.types.LongValue;
-import org.apache.flink.types.Record;
-import org.apache.flink.util.Collector;
-
-@SuppressWarnings("deprecation")
-public class CompensatableDotProductMatch extends JoinFunction {
-
-	private static final long serialVersionUID = 1L;
-
-	private Record record;
-
-	private LongValue vertexID;
-
-	private DoubleValue partialRank;
-
-	private DoubleValue rank = new DoubleValue();
-
-	private LongArrayView adjacentNeighbors = new LongArrayView();
-
-	private int workerIndex;
-
-	private int currentIteration;
-
-	private int failingIteration;
-
-	private Set<Integer> failingWorkers;
-
-	private double messageLoss;
-
-	private Random random;
-
-	@Override
-	public void open(Configuration parameters) {
-		record = new Record();
-		vertexID = new LongValue();
-		partialRank = new DoubleValue();
-
-		workerIndex = getRuntimeContext().getIndexOfThisSubtask();
-		currentIteration = getIterationRuntimeContext().getSuperstepNumber();
-		failingIteration = ConfigUtils.asInteger("compensation.failingIteration", parameters);
-		failingWorkers = ConfigUtils.asIntSet("compensation.failingWorker", parameters);
-		messageLoss = ConfigUtils.asDouble("compensation.messageLoss", parameters);
-
-		random = new Random();
-	}
-
-	@Override
-	public void join(Record pageWithRank, Record adjacencyList, Collector<Record> collector) {
-
-		rank = pageWithRank.getField(1, rank);
-		adjacentNeighbors = adjacencyList.getField(1, adjacentNeighbors);
-		int numNeighbors = adjacentNeighbors.size();
-
-		double rankToDistribute = rank.getValue() / (double) numNeighbors;
-
-		partialRank.setValue(rankToDistribute);
-		record.setField(1, partialRank);
-
-		boolean isFailure = currentIteration == failingIteration && failingWorkers.contains(workerIndex);
-
-		for (int n = 0; n < numNeighbors; n++) {
-			vertexID.setValue(adjacentNeighbors.getQuick(n));
-			record.setField(0, vertexID);
-
-			if (isFailure) {
-				if (random.nextDouble() >= messageLoss) {
-					collector.collect(record);
-				}
-			} else {
-				collector.collect(record);
-			}
-		}
-	}
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/a7a57ebe/flink-tests/src/test/java/org/apache/flink/test/iterative/nephele/danglingpagerank/CompensatingMap.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/iterative/nephele/danglingpagerank/CompensatingMap.java b/flink-tests/src/test/java/org/apache/flink/test/iterative/nephele/danglingpagerank/CompensatingMap.java
deleted file mode 100644
index d8189ef..0000000
--- a/flink-tests/src/test/java/org/apache/flink/test/iterative/nephele/danglingpagerank/CompensatingMap.java
+++ /dev/null
@@ -1,88 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-
-package org.apache.flink.test.iterative.nephele.danglingpagerank;
-
-import java.util.Set;
-
-import org.apache.flink.api.java.record.functions.MapFunction;
-import org.apache.flink.configuration.Configuration;
-import org.apache.flink.test.iterative.nephele.ConfigUtils;
-import org.apache.flink.types.DoubleValue;
-import org.apache.flink.types.Record;
-import org.apache.flink.util.Collector;
-
-@SuppressWarnings("deprecation")
-public class CompensatingMap extends MapFunction {
-
-	private static final long serialVersionUID = 1L;
-
-	private int workerIndex;
-
-	private int currentIteration;
-
-	private long numVertices;
-
-	private int failingIteration;
-
-	private Set<Integer> failingWorkers;
-
-	private double uniformRank;
-
-	private double rescaleFactor;
-
-	private DoubleValue rank = new DoubleValue();
-
-	@Override
-	public void open(Configuration parameters) {
-
-		workerIndex = getRuntimeContext().getIndexOfThisSubtask();
-		currentIteration = getIterationRuntimeContext().getSuperstepNumber();
-		failingIteration = ConfigUtils.asInteger("compensation.failingIteration", parameters);
-		failingWorkers = ConfigUtils.asIntSet("compensation.failingWorker", parameters);
-		numVertices = ConfigUtils.asLong("pageRank.numVertices", parameters);
-
-		if (currentIteration > 1) {
-			PageRankStats stats = (PageRankStats) getIterationRuntimeContext().getPreviousIterationAggregate(
-				CompensatableDotProductCoGroup.AGGREGATOR_NAME);
-
-			uniformRank = 1d / (double) numVertices;
-			double lostMassFactor = (numVertices - stats.numVertices()) / (double) numVertices;
-			rescaleFactor = (1 - lostMassFactor) / stats.rank();
-		}
-	}
-
-	@Override
-	public void map(Record pageWithRank, Collector<Record> out) {
-
-		if (currentIteration == failingIteration + 1) {
-
-			rank = pageWithRank.getField(1, rank);
-
-			if (failingWorkers.contains(workerIndex)) {
-				rank.setValue(uniformRank);
-			} else {
-				rank.setValue(rank.getValue() * rescaleFactor);
-			}
-			pageWithRank.setField(1, rank);
-		}
-		
-		out.collect(pageWithRank);
-	}
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/a7a57ebe/flink-tests/src/test/java/org/apache/flink/test/iterative/nephele/danglingpagerank/DanglingPageGenerateRankInputFormat.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/iterative/nephele/danglingpagerank/DanglingPageGenerateRankInputFormat.java b/flink-tests/src/test/java/org/apache/flink/test/iterative/nephele/danglingpagerank/DanglingPageGenerateRankInputFormat.java
deleted file mode 100644
index 4da2bf2..0000000
--- a/flink-tests/src/test/java/org/apache/flink/test/iterative/nephele/danglingpagerank/DanglingPageGenerateRankInputFormat.java
+++ /dev/null
@@ -1,62 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-
-package org.apache.flink.test.iterative.nephele.danglingpagerank;
-
-import java.util.regex.Pattern;
-
-import org.apache.flink.api.java.record.io.TextInputFormat;
-import org.apache.flink.configuration.Configuration;
-import org.apache.flink.test.iterative.nephele.ConfigUtils;
-import org.apache.flink.types.DoubleValue;
-import org.apache.flink.types.LongValue;
-import org.apache.flink.types.Record;
-
-public class DanglingPageGenerateRankInputFormat extends TextInputFormat {
-
-	private static final long serialVersionUID = 1L;
-
-	private DoubleValue initialRank;
-
-	private static final Pattern SEPARATOR = Pattern.compile("[, \t]");
-
-	@Override
-	public void configure(Configuration parameters) {
-		long numVertices = ConfigUtils.asLong("pageRank.numVertices", parameters);
-		initialRank = new DoubleValue(1 / (double) numVertices);
-		super.configure(parameters);
-	}
-
-	@Override
-	public Record readRecord(Record target, byte[] bytes, int offset, int numBytes) {
-		String str = new String(bytes, offset, numBytes);
-
-		String[] tokens = SEPARATOR.split(str);
-
-		long vertexID = Long.parseLong(tokens[0]);
-		boolean isDangling = tokens.length > 1 && Integer.parseInt(tokens[1]) == 1;
-
-		target.clear();
-		target.addField(new LongValue(vertexID));
-		target.addField(initialRank);
-		target.addField(new BooleanValue(isDangling));
-
-		return target;
-	}
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/a7a57ebe/flink-tests/src/test/java/org/apache/flink/test/iterative/nephele/danglingpagerank/DiffL1NormConvergenceCriterion.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/iterative/nephele/danglingpagerank/DiffL1NormConvergenceCriterion.java b/flink-tests/src/test/java/org/apache/flink/test/iterative/nephele/danglingpagerank/DiffL1NormConvergenceCriterion.java
deleted file mode 100644
index 9868173..0000000
--- a/flink-tests/src/test/java/org/apache/flink/test/iterative/nephele/danglingpagerank/DiffL1NormConvergenceCriterion.java
+++ /dev/null
@@ -1,44 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-
-package org.apache.flink.test.iterative.nephele.danglingpagerank;
-
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-import org.apache.flink.api.common.aggregators.ConvergenceCriterion;
-
-@SuppressWarnings("serial")
-public class DiffL1NormConvergenceCriterion implements ConvergenceCriterion<PageRankStats> {
-
-	private static final double EPSILON = 0.00005;
-
-	private static final Logger log = LoggerFactory.getLogger(DiffL1NormConvergenceCriterion.class);
-
-	@Override
-	public boolean isConverged(int iteration, PageRankStats pageRankStats) {
-		double diff = pageRankStats.diff();
-
-		if (log.isInfoEnabled()) {
-			log.info("Stats in iteration [" + iteration + "]: " + pageRankStats);
-			log.info("L1 norm of the vector difference is [" + diff + "] in iteration [" + iteration + "]");
-		}
-
-		return diff < EPSILON;
-	}
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/a7a57ebe/flink-tests/src/test/java/org/apache/flink/test/iterative/nephele/danglingpagerank/ImprovedAdjacencyListInputFormat.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/iterative/nephele/danglingpagerank/ImprovedAdjacencyListInputFormat.java b/flink-tests/src/test/java/org/apache/flink/test/iterative/nephele/danglingpagerank/ImprovedAdjacencyListInputFormat.java
deleted file mode 100644
index df2fed6..0000000
--- a/flink-tests/src/test/java/org/apache/flink/test/iterative/nephele/danglingpagerank/ImprovedAdjacencyListInputFormat.java
+++ /dev/null
@@ -1,74 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-
-package org.apache.flink.test.iterative.nephele.danglingpagerank;
-
-import org.apache.flink.api.java.record.io.DelimitedInputFormat;
-import org.apache.flink.types.LongValue;
-import org.apache.flink.types.Record;
-
-public class ImprovedAdjacencyListInputFormat extends DelimitedInputFormat {
-  private static final long serialVersionUID = 1L;
-
-  private final LongValue vertexID = new LongValue();
-  private final AsciiLongArrayView arrayView = new AsciiLongArrayView();
-  private final LongArrayView adjacentVertices = new LongArrayView();
-
-  @Override
-  public Record readRecord(Record target, byte[] bytes, int offset, int numBytes) {
-
-    if (numBytes == 0) {
-      return null;
-    }
-
-    arrayView.set(bytes, offset, numBytes);
-
-    int numElements = arrayView.numElements();
-    adjacentVertices.allocate(numElements - 1);
-
-    try {
-
-      int pos = 0;
-      while (arrayView.next()) {
-
-        if (pos == 0) {
-          vertexID.setValue(arrayView.element());
-        } else {
-          adjacentVertices.setQuick(pos - 1, arrayView.element());
-        }
-
-        pos++;
-      }
-
-      //sanity check
-      if (pos != numElements) {
-        throw new IllegalStateException("Should have gotten " + numElements + " elements, but saw " + pos);
-      }
-
-    } catch (RuntimeException e) {
-      throw new RuntimeException("Error parsing: " + arrayView.toString(), e);
-    }
-
-    target.clear();
-    target.addField(vertexID);
-    target.addField(adjacentVertices);
-    return target;
-  }
-}
-

http://git-wip-us.apache.org/repos/asf/flink/blob/a7a57ebe/flink-tests/src/test/java/org/apache/flink/test/iterative/nephele/danglingpagerank/ImprovedDanglingPageRankInputFormat.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/iterative/nephele/danglingpagerank/ImprovedDanglingPageRankInputFormat.java b/flink-tests/src/test/java/org/apache/flink/test/iterative/nephele/danglingpagerank/ImprovedDanglingPageRankInputFormat.java
deleted file mode 100644
index cfa64c8..0000000
--- a/flink-tests/src/test/java/org/apache/flink/test/iterative/nephele/danglingpagerank/ImprovedDanglingPageRankInputFormat.java
+++ /dev/null
@@ -1,73 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-
-package org.apache.flink.test.iterative.nephele.danglingpagerank;
-
-import org.apache.flink.api.java.record.io.DelimitedInputFormat;
-import org.apache.flink.configuration.Configuration;
-import org.apache.flink.test.iterative.nephele.ConfigUtils;
-import org.apache.flink.types.DoubleValue;
-import org.apache.flink.types.LongValue;
-import org.apache.flink.types.Record;
-
-public class ImprovedDanglingPageRankInputFormat extends DelimitedInputFormat {
-	private static final long serialVersionUID = 1L;
-
-	private LongValue vertexID = new LongValue();
-
-	private DoubleValue initialRank;
-
-	private BooleanValue isDangling = new BooleanValue();
-
-	private AsciiLongArrayView arrayView = new AsciiLongArrayView();
-
-	private static final long DANGLING_MARKER = 1l;
-
-	@Override
-	public void configure(Configuration parameters) {
-		long numVertices = ConfigUtils.asLong("pageRank.numVertices", parameters);
-		initialRank = new DoubleValue(1 / (double) numVertices);
-		super.configure(parameters);
-	}
-
-	@Override
-	public Record readRecord(Record target, byte[] bytes, int offset, int numBytes) {
-		arrayView.set(bytes, offset, numBytes);
-
-		try {
-			arrayView.next();
-			vertexID.setValue(arrayView.element());
-
-			if (arrayView.next()) {
-				isDangling.set(arrayView.element() == DANGLING_MARKER);
-			} else {
-				isDangling.set(false);
-			}
-
-		} catch (NumberFormatException e) {
-			throw new RuntimeException("Error parsing " + arrayView.toString(), e);
-		}
-
-		target.clear();
-		target.addField(vertexID);
-		target.addField(initialRank);
-		target.addField(isDangling);
-		return target;
-	}
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/a7a57ebe/flink-tests/src/test/java/org/apache/flink/test/iterative/nephele/danglingpagerank/LongArrayView.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/iterative/nephele/danglingpagerank/LongArrayView.java b/flink-tests/src/test/java/org/apache/flink/test/iterative/nephele/danglingpagerank/LongArrayView.java
deleted file mode 100644
index 825b210..0000000
--- a/flink-tests/src/test/java/org/apache/flink/test/iterative/nephele/danglingpagerank/LongArrayView.java
+++ /dev/null
@@ -1,88 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-
-package org.apache.flink.test.iterative.nephele.danglingpagerank;
-
-import java.io.IOException;
-
-import org.apache.flink.core.memory.DataInputView;
-import org.apache.flink.core.memory.DataOutputView;
-import org.apache.flink.types.Value;
-
-public class LongArrayView implements Value {
-  private static final long serialVersionUID = 1L;
-
-  private long[] entries = new long[0];
-  private int numEntries = 0;
-
-  public LongArrayView() {
-  }
-
-  public long get(int index) {
-    if (index >= numEntries) {
-      throw new ArrayIndexOutOfBoundsException();
-    }
-    return getQuick(index);
-  }
-
-  public long getQuick(int index) {
-    return entries[index];
-  }
-
-  public void allocate(int numEntries) {
-    this.numEntries = numEntries;
-    ensureCapacity();
-  }
-
-  public void set(int index, long value) {
-    if (index >= numEntries) {
-      throw new ArrayIndexOutOfBoundsException();
-    }
-    setQuick(index, value);
-  }
-
-  public void setQuick(int index, long value) {
-    entries[index] = value;
-  }
-
-  public int size() {
-    return numEntries;
-  }
-
-  private void ensureCapacity() {
-    if (entries.length < numEntries) {
-      entries = new long[numEntries];
-    }
-  }
-
-  public void write(DataOutputView out) throws IOException {
-    out.writeInt(numEntries);
-    for (int n = 0; n < numEntries; n++) {
-      out.writeLong(entries[n]);
-    }
-  }
-
-  public void read(DataInputView in) throws IOException {
-    numEntries = in.readInt();
-    ensureCapacity();
-    for (int n = 0; n < numEntries; n++) {
-      entries[n] = in.readLong();
-    }
-  }
-}