You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@giraph.apache.org by ik...@apache.org on 2015/06/11 04:50:41 UTC
[2/5] git commit: updated refs/heads/trunk to 819d6d3
http://git-wip-us.apache.org/repos/asf/giraph/blob/819d6d38/giraph-block-app/src/main/java/org/apache/giraph/block_app/framework/piece/global_comm/internal/ReducersForPieceHandler.java
----------------------------------------------------------------------
diff --git a/giraph-block-app/src/main/java/org/apache/giraph/block_app/framework/piece/global_comm/internal/ReducersForPieceHandler.java b/giraph-block-app/src/main/java/org/apache/giraph/block_app/framework/piece/global_comm/internal/ReducersForPieceHandler.java
new file mode 100644
index 0000000..f06dd89
--- /dev/null
+++ b/giraph-block-app/src/main/java/org/apache/giraph/block_app/framework/piece/global_comm/internal/ReducersForPieceHandler.java
@@ -0,0 +1,250 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.giraph.block_app.framework.piece.global_comm.internal;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import org.apache.giraph.block_app.framework.api.BlockMasterApi;
+import org.apache.giraph.block_app.framework.piece.global_comm.BroadcastHandle;
+import org.apache.giraph.block_app.framework.piece.global_comm.ReducerHandle;
+import org.apache.giraph.master.MasterGlobalCommUsage;
+import org.apache.giraph.reducers.ReduceOperation;
+import org.apache.giraph.reducers.Reducer;
+import org.apache.giraph.utils.WritableUtils;
+import org.apache.giraph.worker.WorkerBroadcastUsage;
+import org.apache.giraph.worker.WorkerReduceUsage;
+import org.apache.hadoop.io.Writable;
+
+/**
+ * All logic for transforming Giraph's reducer API to reducer handles.
+ * Contains state of active reducers, and is kept within a Piece.
+ */
+public class ReducersForPieceHandler implements VertexSenderObserver {
+ private static final AtomicInteger HANDLER_COUNTER = new AtomicInteger();
+ private static final AtomicInteger BROADCAST_COUNTER = new AtomicInteger();
+
+ private final int handleIndex = HANDLER_COUNTER.incrementAndGet();
+ private final AtomicInteger reduceCounter = new AtomicInteger();
+
+ private final ArrayList<VertexSenderObserver> observers = new ArrayList<>();
+
+ @Override
+ public void vertexSenderWorkerPreprocess(WorkerReduceUsage usage) {
+ for (VertexSenderObserver observer : observers) {
+ observer.vertexSenderWorkerPreprocess(usage);
+ }
+ }
+
+ @Override
+ public void vertexSenderWorkerPostprocess(WorkerReduceUsage usage) {
+ for (VertexSenderObserver observer : observers) {
+ observer.vertexSenderWorkerPostprocess(usage);
+ }
+ }
+
+ public <S, R extends Writable> ReducerHandle<S, R> createLocalReducer(
+ MasterGlobalCommUsage master, ReduceOperation<S, R> reduceOp,
+ R globalInitialValue) {
+ LocalReduceHandle<S, R> handle = new LocalReduceHandle<>(reduceOp);
+ master.registerReducer(handle.getName(), reduceOp, globalInitialValue);
+ observers.add(handle);
+ return handle;
+ }
+
+ public <S, R extends Writable> ReducerHandle<S, R> createGlobalReducer(
+ MasterGlobalCommUsage master, ReduceOperation<S, R> reduceOp,
+ R globalInitialValue) {
+ ReduceHandleImpl<S, R> handle = new GlobalReduceHandle<>(reduceOp);
+ master.registerReducer(handle.getName(), reduceOp, globalInitialValue);
+ observers.add(handle);
+ return handle;
+ }
+
+ /**
+ * Implementation of BroadcastHandle
+ *
+ * @param <T> Value type
+ */
+ public static class BroadcastHandleImpl<T> implements BroadcastHandle<T> {
+ private final String name;
+
+ public BroadcastHandleImpl() {
+ this.name = "_utils.broadcast." + BROADCAST_COUNTER.incrementAndGet();
+ }
+
+ public String getName() {
+ return name;
+ }
+
+ @Override
+ public T getBroadcast(WorkerBroadcastUsage worker) {
+ return worker.getBroadcast(name);
+ }
+ }
+
+ /**
+ * Parent implementation of ReducerHandle
+ *
+ * @param <S> Single value type
+ * @param <R> Reduced value type
+ */
+ public abstract class ReduceHandleImpl<S, R extends Writable>
+ implements ReducerHandle<S, R>, VertexSenderObserver {
+ protected final ReduceOperation<S, R> reduceOp;
+ private final String name;
+
+ private ReduceHandleImpl(ReduceOperation<S, R> reduceOp) {
+ this.reduceOp = reduceOp;
+ name = "_utils." + handleIndex +
+ ".reduce." + reduceCounter.incrementAndGet();
+ }
+
+ public String getName() {
+ return name;
+ }
+
+ @Override
+ public R getReducedValue(MasterGlobalCommUsage master) {
+ return master.getReduced(name);
+ }
+
+ @Override
+ public BroadcastHandle<R> broadcastValue(BlockMasterApi master) {
+ return unwrapHandle(master.broadcast(
+ new WrappedReducedValue<>(reduceOp, getReducedValue(master))));
+ }
+ }
+
+ private static <R extends Writable> BroadcastHandle<R> unwrapHandle(
+ final BroadcastHandle<WrappedReducedValue<R>> handle) {
+ return new BroadcastHandle<R>() {
+ @Override
+ public R getBroadcast(WorkerBroadcastUsage worker) {
+ return handle.getBroadcast(worker).getValue();
+ }
+ };
+ }
+
+ /**
+ * Wrapper that makes reduced values self-serializable,
+ * and allows them to be broadcasted.
+ *
+ * @param <R> Reduced value type
+ */
+ public static class WrappedReducedValue<R extends Writable>
+ implements Writable {
+ private ReduceOperation<?, R> reduceOp;
+ private R value;
+
+ public WrappedReducedValue() {
+ }
+
+ public WrappedReducedValue(ReduceOperation<?, R> reduceOp, R value) {
+ this.reduceOp = reduceOp;
+ this.value = value;
+ }
+
+ @Override
+ public void write(DataOutput out) throws IOException {
+ WritableUtils.writeWritableObject(reduceOp, out);
+ value.write(out);
+ }
+
+ @Override
+ public void readFields(DataInput in) throws IOException {
+ reduceOp = WritableUtils.readWritableObject(in, null);
+ value = reduceOp.createInitialValue();
+ value.readFields(in);
+ }
+
+ public R getValue() {
+ return value;
+ }
+ }
+
+ /**
+ * Global Reduce Handle is implementation of ReducerHandle, that will keep
+ * only one value for each worker, and each call to reduce will have
+ * to obtain a global lock, and incur synchronization costs.
+ * Use only when objects are so large, that having many copies cannot fit
+ * into memory.
+ *
+ * @param <S> Single value type
+ * @param <R> Reduced value type
+ */
+ public class GlobalReduceHandle<S, R extends Writable>
+ extends ReduceHandleImpl<S, R> {
+ private transient WorkerReduceUsage usage;
+
+ public GlobalReduceHandle(ReduceOperation<S, R> reduceOp) {
+ super(reduceOp);
+ }
+
+ @Override
+ public void vertexSenderWorkerPreprocess(WorkerReduceUsage usage) {
+ this.usage = usage;
+ }
+
+ @Override
+ public void reduce(S valueToReduce) {
+ usage.reduce(getName(), valueToReduce);
+ }
+
+ @Override
+ public void vertexSenderWorkerPostprocess(WorkerReduceUsage usage) {
+ }
+ }
+
+ /**
+ * Local Reduce Handle is implementation of ReducerHandle, that will make a
+ * partially reduced value on each worker thread, which are at the end
+ * reduced all together.
+ * This is preferred implementation, unless it cannot be used due to memory
+ * overhead, because all partially reduced values will not fit the memory.
+ *
+ * @param <S> Single value type
+ * @param <R> Reduced value type
+ */
+ public class LocalReduceHandle<S, R extends Writable>
+ extends ReduceHandleImpl<S, R> {
+ private transient Reducer<S, R> reducer;
+
+ public LocalReduceHandle(ReduceOperation<S, R> reduceOp) {
+ super(reduceOp);
+ }
+
+ @Override
+ public void vertexSenderWorkerPreprocess(WorkerReduceUsage usage) {
+ this.reducer = new Reducer<>(reduceOp);
+ }
+
+ @Override
+ public void reduce(S valueToReduce) {
+ reducer.reduce(valueToReduce);
+ }
+
+ @Override
+ public void vertexSenderWorkerPostprocess(WorkerReduceUsage usage) {
+ usage.reduceMerge(getName(), reducer.getCurrentValue());
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/giraph/blob/819d6d38/giraph-block-app/src/main/java/org/apache/giraph/block_app/framework/piece/global_comm/internal/VertexSenderObserver.java
----------------------------------------------------------------------
diff --git a/giraph-block-app/src/main/java/org/apache/giraph/block_app/framework/piece/global_comm/internal/VertexSenderObserver.java b/giraph-block-app/src/main/java/org/apache/giraph/block_app/framework/piece/global_comm/internal/VertexSenderObserver.java
new file mode 100644
index 0000000..5b3485f
--- /dev/null
+++ b/giraph-block-app/src/main/java/org/apache/giraph/block_app/framework/piece/global_comm/internal/VertexSenderObserver.java
@@ -0,0 +1,28 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.giraph.block_app.framework.piece.global_comm.internal;
+
+import org.apache.giraph.worker.WorkerReduceUsage;
+
+/**
+ * Observer able to hook into vertex sender pre/post processing
+ */
+public interface VertexSenderObserver {
+ void vertexSenderWorkerPreprocess(WorkerReduceUsage usage);
+ void vertexSenderWorkerPostprocess(WorkerReduceUsage usage);
+}
http://git-wip-us.apache.org/repos/asf/giraph/blob/819d6d38/giraph-block-app/src/main/java/org/apache/giraph/block_app/framework/piece/global_comm/internal/package-info.java
----------------------------------------------------------------------
diff --git a/giraph-block-app/src/main/java/org/apache/giraph/block_app/framework/piece/global_comm/internal/package-info.java b/giraph-block-app/src/main/java/org/apache/giraph/block_app/framework/piece/global_comm/internal/package-info.java
new file mode 100644
index 0000000..1ba7c8f
--- /dev/null
+++ b/giraph-block-app/src/main/java/org/apache/giraph/block_app/framework/piece/global_comm/internal/package-info.java
@@ -0,0 +1,22 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+/**
+ * Reducer and Broadcast Handles internal implementation for automatic handling
+ * of global communication within Pieces, hiding a lot of it's complexities.
+ */
+package org.apache.giraph.block_app.framework.piece.global_comm.internal;
http://git-wip-us.apache.org/repos/asf/giraph/blob/819d6d38/giraph-block-app/src/main/java/org/apache/giraph/block_app/framework/piece/global_comm/map/BroadcastMapHandle.java
----------------------------------------------------------------------
diff --git a/giraph-block-app/src/main/java/org/apache/giraph/block_app/framework/piece/global_comm/map/BroadcastMapHandle.java b/giraph-block-app/src/main/java/org/apache/giraph/block_app/framework/piece/global_comm/map/BroadcastMapHandle.java
new file mode 100644
index 0000000..50d7818
--- /dev/null
+++ b/giraph-block-app/src/main/java/org/apache/giraph/block_app/framework/piece/global_comm/map/BroadcastMapHandle.java
@@ -0,0 +1,36 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.giraph.block_app.framework.piece.global_comm.map;
+
+import org.apache.giraph.block_app.framework.piece.global_comm.BroadcastHandle;
+import org.apache.giraph.worker.WorkerBroadcastUsage;
+
+/**
+ * Handle to map of broadcasts
+ *
+ * @param <K> Key type
+ * @param <V> Value type
+ */
+public interface BroadcastMapHandle<K, V>
+ extends MapHandle<K, BroadcastHandle<V>> {
+
+ /**
+ * Number of elements that were broadcasted.
+ */
+ int getBroadcastedSize(WorkerBroadcastUsage worker);
+}
http://git-wip-us.apache.org/repos/asf/giraph/blob/819d6d38/giraph-block-app/src/main/java/org/apache/giraph/block_app/framework/piece/global_comm/map/MapHandle.java
----------------------------------------------------------------------
diff --git a/giraph-block-app/src/main/java/org/apache/giraph/block_app/framework/piece/global_comm/map/MapHandle.java b/giraph-block-app/src/main/java/org/apache/giraph/block_app/framework/piece/global_comm/map/MapHandle.java
new file mode 100644
index 0000000..db01e77
--- /dev/null
+++ b/giraph-block-app/src/main/java/org/apache/giraph/block_app/framework/piece/global_comm/map/MapHandle.java
@@ -0,0 +1,31 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.giraph.block_app.framework.piece.global_comm.map;
+
+/**
+ * Handle to map of handles underneath
+ *
+ * @param <K> Key type
+ * @param <V> Value type
+ */
+public interface MapHandle<K, V> {
+ /**
+ * Get value for key.
+ */
+ V get(K key);
+}
http://git-wip-us.apache.org/repos/asf/giraph/blob/819d6d38/giraph-block-app/src/main/java/org/apache/giraph/block_app/framework/piece/global_comm/map/ReducerMapHandle.java
----------------------------------------------------------------------
diff --git a/giraph-block-app/src/main/java/org/apache/giraph/block_app/framework/piece/global_comm/map/ReducerMapHandle.java b/giraph-block-app/src/main/java/org/apache/giraph/block_app/framework/piece/global_comm/map/ReducerMapHandle.java
new file mode 100644
index 0000000..5c31179
--- /dev/null
+++ b/giraph-block-app/src/main/java/org/apache/giraph/block_app/framework/piece/global_comm/map/ReducerMapHandle.java
@@ -0,0 +1,43 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.giraph.block_app.framework.piece.global_comm.map;
+
+import org.apache.giraph.block_app.framework.api.BlockMasterApi;
+import org.apache.giraph.block_app.framework.piece.global_comm.ReducerHandle;
+
+/**
+ * Handle to array of reducers
+ *
+ * @param <I> Key type
+ * @param <S> Single value type
+ * @param <R> Reduced value type
+ */
+public interface ReducerMapHandle<I, S, R>
+ extends MapHandle<I, ReducerHandle<S, R>> {
+ /**
+ * Number of elements that were reduced.
+ */
+ int getReducedSize(BlockMasterApi master);
+
+ /**
+ * Broadcast whole map of reducers to master
+ *
+ * @return Handle to the broadcasted map.
+ */
+ BroadcastMapHandle<I, R> broadcastValue(BlockMasterApi master);
+}
http://git-wip-us.apache.org/repos/asf/giraph/blob/819d6d38/giraph-block-app/src/main/java/org/apache/giraph/block_app/framework/piece/global_comm/map/package-info.java
----------------------------------------------------------------------
diff --git a/giraph-block-app/src/main/java/org/apache/giraph/block_app/framework/piece/global_comm/map/package-info.java b/giraph-block-app/src/main/java/org/apache/giraph/block_app/framework/piece/global_comm/map/package-info.java
new file mode 100644
index 0000000..092f864
--- /dev/null
+++ b/giraph-block-app/src/main/java/org/apache/giraph/block_app/framework/piece/global_comm/map/package-info.java
@@ -0,0 +1,21 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+/**
+ * Interfaces representing map of individual handles
+ */
+package org.apache.giraph.block_app.framework.piece.global_comm.map;
http://git-wip-us.apache.org/repos/asf/giraph/blob/819d6d38/giraph-block-app/src/main/java/org/apache/giraph/block_app/framework/piece/global_comm/package-info.java
----------------------------------------------------------------------
diff --git a/giraph-block-app/src/main/java/org/apache/giraph/block_app/framework/piece/global_comm/package-info.java b/giraph-block-app/src/main/java/org/apache/giraph/block_app/framework/piece/global_comm/package-info.java
new file mode 100644
index 0000000..0d40741
--- /dev/null
+++ b/giraph-block-app/src/main/java/org/apache/giraph/block_app/framework/piece/global_comm/package-info.java
@@ -0,0 +1,22 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+/**
+ * Interfaces for Reducer and Broadcast Handles for automatic handling
+ * of global communication within Pieces, hiding a lot of it's complexities.
+ */
+package org.apache.giraph.block_app.framework.piece.global_comm;
http://git-wip-us.apache.org/repos/asf/giraph/blob/819d6d38/giraph-block-app/src/main/java/org/apache/giraph/block_app/framework/piece/interfaces/VertexPostprocessor.java
----------------------------------------------------------------------
diff --git a/giraph-block-app/src/main/java/org/apache/giraph/block_app/framework/piece/interfaces/VertexPostprocessor.java b/giraph-block-app/src/main/java/org/apache/giraph/block_app/framework/piece/interfaces/VertexPostprocessor.java
new file mode 100644
index 0000000..b6cc749
--- /dev/null
+++ b/giraph-block-app/src/main/java/org/apache/giraph/block_app/framework/piece/interfaces/VertexPostprocessor.java
@@ -0,0 +1,34 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.giraph.block_app.framework.piece.interfaces;
+
+import org.apache.giraph.writable.kryo.markers.NonKryoWritable;
+
+/**
+ * Interface containing a single function - postprocess.
+ *
+ * Marked to not allow seriazliation, as it should be created on the worker,
+ * so should never be serialiized, disallow only for catching problems early.
+ */
+public interface VertexPostprocessor extends NonKryoWritable {
+ /**
+ * Override to finish computation. This method is executed exactly once
+ * after computation for all vertices in the partition is complete.
+ */
+ void postprocess();
+}
http://git-wip-us.apache.org/repos/asf/giraph/blob/819d6d38/giraph-block-app/src/main/java/org/apache/giraph/block_app/framework/piece/interfaces/VertexReceiver.java
----------------------------------------------------------------------
diff --git a/giraph-block-app/src/main/java/org/apache/giraph/block_app/framework/piece/interfaces/VertexReceiver.java b/giraph-block-app/src/main/java/org/apache/giraph/block_app/framework/piece/interfaces/VertexReceiver.java
new file mode 100644
index 0000000..26912ee
--- /dev/null
+++ b/giraph-block-app/src/main/java/org/apache/giraph/block_app/framework/piece/interfaces/VertexReceiver.java
@@ -0,0 +1,54 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.giraph.block_app.framework.piece.interfaces;
+
+import org.apache.giraph.graph.Vertex;
+import org.apache.giraph.writable.kryo.markers.NonKryoWritable;
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.io.WritableComparable;
+
+/**
+ * Interface representing actions that happen on worker, for each vertex,
+ * during receive phase:
+ * <ul>
+ * <li> to receive messages from vertices </li>
+ * <li> to receive data from master through aggregators </li>
+ * </ul>
+ *
+ * Marked to not allow seriazliation, as it should be created on the worker,
+ * so should never be serialiized, disallow only for catching problems early.
+ *
+ * @param <I> Vertex id type
+ * @param <V> Vertex value type
+ * @param <E> Edge value type
+ * @param <M> Message type
+ */
+@SuppressWarnings("rawtypes")
+public interface VertexReceiver<I extends WritableComparable,
+ V extends Writable, E extends Writable, M extends Writable>
+ extends NonKryoWritable {
+ /**
+ * Must be defined by user to do computation on a single Vertex.
+ *
+ * @param vertex Vertex
+ * @param messages Messages that were sent to this vertex in the previous
+ * superstep. Each message is only guaranteed to have
+ * a life expectancy as long as next() is not called.
+ */
+ void vertexReceive(Vertex<I, V, E> vertex, Iterable<M> messages);
+}
http://git-wip-us.apache.org/repos/asf/giraph/blob/819d6d38/giraph-block-app/src/main/java/org/apache/giraph/block_app/framework/piece/interfaces/VertexSender.java
----------------------------------------------------------------------
diff --git a/giraph-block-app/src/main/java/org/apache/giraph/block_app/framework/piece/interfaces/VertexSender.java b/giraph-block-app/src/main/java/org/apache/giraph/block_app/framework/piece/interfaces/VertexSender.java
new file mode 100644
index 0000000..0587032
--- /dev/null
+++ b/giraph-block-app/src/main/java/org/apache/giraph/block_app/framework/piece/interfaces/VertexSender.java
@@ -0,0 +1,45 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.giraph.block_app.framework.piece.interfaces;
+
+import org.apache.giraph.graph.Vertex;
+import org.apache.giraph.writable.kryo.markers.NonKryoWritable;
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.io.WritableComparable;
+
+/**
+ * Interface representing actions that happen on worker, for each vertex,
+ * during send phase:
+ * <ul>
+ * <li> to send messages to vertices </li>
+ * <li> to send data for aggregation on master </li>
+ * </ul>
+ *
+ * Marked to not allow seriazliation, as it should be created on the worker,
+ * so should never be serialiized, disallow only for catching problems early.
+ *
+ * @param <I> Vertex id type
+ * @param <V> Vertex value type
+ * @param <E> Edge value type
+ */
+@SuppressWarnings("rawtypes")
+public interface VertexSender<I extends WritableComparable,
+ V extends Writable, E extends Writable> extends NonKryoWritable {
+ /** Must be defined by user to do computation on a single Vertex. */
+ void vertexSend(Vertex<I, V, E> vertex);
+}
http://git-wip-us.apache.org/repos/asf/giraph/blob/819d6d38/giraph-block-app/src/main/java/org/apache/giraph/block_app/framework/piece/interfaces/package-info.java
----------------------------------------------------------------------
diff --git a/giraph-block-app/src/main/java/org/apache/giraph/block_app/framework/piece/interfaces/package-info.java b/giraph-block-app/src/main/java/org/apache/giraph/block_app/framework/piece/interfaces/package-info.java
new file mode 100644
index 0000000..db05e78
--- /dev/null
+++ b/giraph-block-app/src/main/java/org/apache/giraph/block_app/framework/piece/interfaces/package-info.java
@@ -0,0 +1,21 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+/**
+ * Vertex processing functions for Pieces
+ */
+package org.apache.giraph.block_app.framework.piece.interfaces;
http://git-wip-us.apache.org/repos/asf/giraph/blob/819d6d38/giraph-block-app/src/main/java/org/apache/giraph/block_app/framework/piece/messages/ObjectMessageClasses.java
----------------------------------------------------------------------
diff --git a/giraph-block-app/src/main/java/org/apache/giraph/block_app/framework/piece/messages/ObjectMessageClasses.java b/giraph-block-app/src/main/java/org/apache/giraph/block_app/framework/piece/messages/ObjectMessageClasses.java
new file mode 100644
index 0000000..dd977e6
--- /dev/null
+++ b/giraph-block-app/src/main/java/org/apache/giraph/block_app/framework/piece/messages/ObjectMessageClasses.java
@@ -0,0 +1,119 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.giraph.block_app.framework.piece.messages;
+
+import org.apache.giraph.combiner.MessageCombiner;
+import org.apache.giraph.comm.messages.MessageEncodeAndStoreType;
+import org.apache.giraph.conf.ImmutableClassesGiraphConfiguration;
+import org.apache.giraph.conf.MessageClasses;
+import org.apache.giraph.factories.MessageValueFactory;
+import org.apache.giraph.utils.ReflectionUtils;
+import org.apache.giraph.writable.kryo.KryoWritable;
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.io.WritableComparable;
+
+import com.google.common.base.Preconditions;
+
+/**
+ * MessageClasses implementation that provides factory and combiner instances
+ * through a provided supplier.
+ *
+ * @param <I> Vertex id type
+ * @param <M> Message type
+ */
+public class ObjectMessageClasses<I extends WritableComparable,
+ M extends Writable> extends KryoWritable implements MessageClasses<I, M> {
+ private final Class<M> messageClass;
+ private final SupplierFromConf<MessageValueFactory<M>>
+ messageValueFactorySupplier;
+ private final SupplierFromConf<? extends MessageCombiner<? super I, M>>
+ messageCombinerSupplier;
+ private final MessageEncodeAndStoreType messageEncodeAndStoreType;
+
+ public ObjectMessageClasses() {
+ this(null, null, null, null);
+ }
+
+ public ObjectMessageClasses(Class<M> messageClass,
+ SupplierFromConf<MessageValueFactory<M>> messageValueFactorySupplier,
+ SupplierFromConf<? extends MessageCombiner<? super I, M>>
+ messageCombinerSupplier,
+ MessageEncodeAndStoreType messageEncodeAndStoreType) {
+ this.messageClass = messageClass;
+ this.messageValueFactorySupplier = messageValueFactorySupplier;
+ this.messageCombinerSupplier = messageCombinerSupplier;
+ this.messageEncodeAndStoreType = messageEncodeAndStoreType;
+ }
+
+ @Override
+ public Class<M> getMessageClass() {
+ return messageClass;
+ }
+
+ @Override
+ public MessageValueFactory<M> createMessageValueFactory(
+ ImmutableClassesGiraphConfiguration conf) {
+ return Preconditions.checkNotNull(messageValueFactorySupplier.apply(conf));
+ }
+
+ @Override
+ public MessageCombiner<? super I, M> createMessageCombiner(
+ ImmutableClassesGiraphConfiguration<I, ? extends Writable,
+ ? extends Writable> conf) {
+ return messageCombinerSupplier != null ?
+ Preconditions.checkNotNull(messageCombinerSupplier.apply(conf)) : null;
+ }
+
+ @Override
+ public boolean useMessageCombiner() {
+ return messageCombinerSupplier != null;
+ }
+
+ @Override
+ public MessageEncodeAndStoreType getMessageEncodeAndStoreType() {
+ return messageEncodeAndStoreType;
+ }
+
+ @Override
+ public MessageClasses<I, M> createCopyForNewSuperstep() {
+ return new ObjectMessageClasses<>(
+ messageClass, messageValueFactorySupplier,
+ messageCombinerSupplier, messageEncodeAndStoreType);
+ }
+
+ @Override
+ public void verifyConsistent(ImmutableClassesGiraphConfiguration conf) {
+ MessageValueFactory<M> messageValueFactory =
+ messageValueFactorySupplier.apply(conf);
+ Preconditions.checkState(
+ messageValueFactory.newInstance().getClass().equals(messageClass));
+
+ if (messageCombinerSupplier != null) {
+ MessageCombiner<? super I, M> messageCombiner =
+ messageCombinerSupplier.apply(conf);
+ Preconditions.checkState(messageCombiner.createInitialMessage()
+ .getClass().equals(messageClass));
+ Class<?>[] combinerTypes = ReflectionUtils.getTypeArguments(
+ MessageCombiner.class, messageCombiner.getClass());
+ ReflectionUtils.verifyTypes(conf.getVertexIdClass(), combinerTypes[0],
+ "Vertex id", messageCombiner.getClass());
+ ReflectionUtils.verifyTypes(messageClass, combinerTypes[1],
+ "Outgoing message", messageCombiner.getClass());
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/giraph/blob/819d6d38/giraph-block-app/src/main/java/org/apache/giraph/block_app/framework/piece/messages/SupplierFromConf.java
----------------------------------------------------------------------
diff --git a/giraph-block-app/src/main/java/org/apache/giraph/block_app/framework/piece/messages/SupplierFromConf.java b/giraph-block-app/src/main/java/org/apache/giraph/block_app/framework/piece/messages/SupplierFromConf.java
new file mode 100644
index 0000000..00c86cd
--- /dev/null
+++ b/giraph-block-app/src/main/java/org/apache/giraph/block_app/framework/piece/messages/SupplierFromConf.java
@@ -0,0 +1,71 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.giraph.block_app.framework.piece.messages;
+
+import org.apache.giraph.conf.ImmutableClassesGiraphConfiguration;
+import org.apache.giraph.factories.DefaultMessageValueFactory;
+import org.apache.giraph.factories.MessageValueFactory;
+import org.apache.giraph.function.Function;
+import org.apache.giraph.writable.kryo.HadoopKryo;
+import org.apache.hadoop.io.Writable;
+
+/**
+ * Supplier from configuration
+ * @param <T> Type of object returned
+ */
+public interface SupplierFromConf<T>
+ extends Function<ImmutableClassesGiraphConfiguration, T> {
+
+ /**
+ * Supplier from configuration, by copying given instance every time.
+ *
+ * @param <T> Type of object returned
+ */
+ public static class SupplierFromConfByCopy<T> implements SupplierFromConf<T> {
+ private final T value;
+
+ public SupplierFromConfByCopy(T value) {
+ this.value = value;
+ }
+
+ @Override
+ public T apply(ImmutableClassesGiraphConfiguration conf) {
+ return HadoopKryo.createCopy(value);
+ }
+ }
+
+ /**
+ * Supplier from configuration returning DefaultMessageValueFactory instances.
+ *
+ * @param <M> Message type
+ */
+ public static class DefaultMessageFactorySupplierFromConf<M extends Writable>
+ implements SupplierFromConf<MessageValueFactory<M>> {
+ private final Class<M> messageClass;
+
+ public DefaultMessageFactorySupplierFromConf(Class<M> messageClass) {
+ this.messageClass = messageClass;
+ }
+
+ @Override
+ public MessageValueFactory<M> apply(
+ ImmutableClassesGiraphConfiguration conf) {
+ return new DefaultMessageValueFactory<>(messageClass, conf);
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/giraph/blob/819d6d38/giraph-block-app/src/main/java/org/apache/giraph/block_app/framework/piece/messages/package-info.java
----------------------------------------------------------------------
diff --git a/giraph-block-app/src/main/java/org/apache/giraph/block_app/framework/piece/messages/package-info.java b/giraph-block-app/src/main/java/org/apache/giraph/block_app/framework/piece/messages/package-info.java
new file mode 100644
index 0000000..ba3014c
--- /dev/null
+++ b/giraph-block-app/src/main/java/org/apache/giraph/block_app/framework/piece/messages/package-info.java
@@ -0,0 +1,21 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+/**
+ * Utility classes for handling of messages within Pieces
+ */
+package org.apache.giraph.block_app.framework.piece.messages;
http://git-wip-us.apache.org/repos/asf/giraph/blob/819d6d38/giraph-block-app/src/main/java/org/apache/giraph/block_app/framework/piece/package-info.java
----------------------------------------------------------------------
diff --git a/giraph-block-app/src/main/java/org/apache/giraph/block_app/framework/piece/package-info.java b/giraph-block-app/src/main/java/org/apache/giraph/block_app/framework/piece/package-info.java
new file mode 100644
index 0000000..fbc6e92
--- /dev/null
+++ b/giraph-block-app/src/main/java/org/apache/giraph/block_app/framework/piece/package-info.java
@@ -0,0 +1,24 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+/**
+ * Single execution object - Piece, and related classes.
+ *
+ * AbstractPiece is parent class of all Pieces. Most frequentlly
+ * users should extend Piece class itself
+ */
+package org.apache.giraph.block_app.framework.piece;
http://git-wip-us.apache.org/repos/asf/giraph/blob/819d6d38/giraph-block-app/src/main/java/org/apache/giraph/function/Consumer.java
----------------------------------------------------------------------
diff --git a/giraph-block-app/src/main/java/org/apache/giraph/function/Consumer.java b/giraph-block-app/src/main/java/org/apache/giraph/function/Consumer.java
new file mode 100644
index 0000000..2a0e36a
--- /dev/null
+++ b/giraph-block-app/src/main/java/org/apache/giraph/function/Consumer.java
@@ -0,0 +1,34 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.giraph.function;
+
+import java.io.Serializable;
+
+
+/**
+ * Function:
+ * (T) -> void
+ *
+ * @param <T> Argument type
+ */
+public interface Consumer<T> extends Serializable {
+ /**
+ * Applies this function to {@code input}
+ */
+ void apply(T input);
+}
http://git-wip-us.apache.org/repos/asf/giraph/blob/819d6d38/giraph-block-app/src/main/java/org/apache/giraph/function/Function.java
----------------------------------------------------------------------
diff --git a/giraph-block-app/src/main/java/org/apache/giraph/function/Function.java b/giraph-block-app/src/main/java/org/apache/giraph/function/Function.java
new file mode 100644
index 0000000..41046ba
--- /dev/null
+++ b/giraph-block-app/src/main/java/org/apache/giraph/function/Function.java
@@ -0,0 +1,38 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.giraph.function;
+
+import java.io.Serializable;
+
+
+/**
+ * Function:
+ * (F) -> T
+ *
+ * @param <F> Argument type
+ * @param <T> Result type
+ */
+public interface Function<F, T> extends Serializable {
+ /**
+ * Returns the result of applying this function to given {@code input}.
+ *
+ * The returned object may or may not be a new instance,
+ * depending on the implementation.
+ */
+ T apply(F input);
+}
http://git-wip-us.apache.org/repos/asf/giraph/blob/819d6d38/giraph-block-app/src/main/java/org/apache/giraph/function/PairConsumer.java
----------------------------------------------------------------------
diff --git a/giraph-block-app/src/main/java/org/apache/giraph/function/PairConsumer.java b/giraph-block-app/src/main/java/org/apache/giraph/function/PairConsumer.java
new file mode 100644
index 0000000..012ec82
--- /dev/null
+++ b/giraph-block-app/src/main/java/org/apache/giraph/function/PairConsumer.java
@@ -0,0 +1,34 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.giraph.function;
+
+import java.io.Serializable;
+
+/**
+ * Function:
+ * (T1, T2) -> void
+ *
+ * @param <T1> First argument type
+ * @param <T2> Second argument type
+ */
+public interface PairConsumer<T1, T2> extends Serializable {
+ /**
+ * Applies this function to {@code input1} and {@code input2}
+ */
+ void apply(T1 input1, T2 input2);
+}
http://git-wip-us.apache.org/repos/asf/giraph/blob/819d6d38/giraph-block-app/src/main/java/org/apache/giraph/function/PairFunction.java
----------------------------------------------------------------------
diff --git a/giraph-block-app/src/main/java/org/apache/giraph/function/PairFunction.java b/giraph-block-app/src/main/java/org/apache/giraph/function/PairFunction.java
new file mode 100644
index 0000000..bfff400
--- /dev/null
+++ b/giraph-block-app/src/main/java/org/apache/giraph/function/PairFunction.java
@@ -0,0 +1,40 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.giraph.function;
+
+import java.io.Serializable;
+
+
+/**
+ * Function:
+ * (F1, F2) -> T
+ *
+ * @param <F1> First argument type
+ * @param <F2> Second argument type
+ * @param <T> Result type
+ */
+public interface PairFunction<F1, F2, T> extends Serializable {
+ /**
+ * Returns the result of applying this function to given
+ * {@code input1} and {@code input2}.
+ *
+ * The returned object may or may not be a new instance,
+ * depending on the implementation.
+ */
+ T apply(F1 input1, F2 input2);
+}
http://git-wip-us.apache.org/repos/asf/giraph/blob/819d6d38/giraph-block-app/src/main/java/org/apache/giraph/function/Supplier.java
----------------------------------------------------------------------
diff --git a/giraph-block-app/src/main/java/org/apache/giraph/function/Supplier.java b/giraph-block-app/src/main/java/org/apache/giraph/function/Supplier.java
new file mode 100644
index 0000000..1813b54
--- /dev/null
+++ b/giraph-block-app/src/main/java/org/apache/giraph/function/Supplier.java
@@ -0,0 +1,37 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.giraph.function;
+
+import java.io.Serializable;
+
+/**
+ * Function:
+ * () -> T
+ * <br>
+ * Specialization of com.google.common.base.Supplier, that is also
+ * Serializable.
+ *
+ * @param <T> Result type
+ */
+public interface Supplier<T> extends Serializable {
+ /**
+ * Retrieves an instance of the appropriate type. The returned object may or
+ * may not be a new instance, depending on the implementation.
+ */
+ T get();
+}
http://git-wip-us.apache.org/repos/asf/giraph/blob/819d6d38/giraph-block-app/src/main/java/org/apache/giraph/function/package-info.java
----------------------------------------------------------------------
diff --git a/giraph-block-app/src/main/java/org/apache/giraph/function/package-info.java b/giraph-block-app/src/main/java/org/apache/giraph/function/package-info.java
new file mode 100644
index 0000000..b089da6
--- /dev/null
+++ b/giraph-block-app/src/main/java/org/apache/giraph/function/package-info.java
@@ -0,0 +1,26 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+/**
+ * Package for interfaces representing different functions,
+ * that all extends Serializable in order for Kryo to be able
+ * to serialize them.
+ *
+ * Even when same interface is present in Guava, we do not extend it
+ * due to @Nullable annotations adding requirement of handling nulls.
+ */
+package org.apache.giraph.function;
http://git-wip-us.apache.org/repos/asf/giraph/blob/819d6d38/giraph-block-app/src/main/java/org/apache/giraph/function/primitive/IntSupplier.java
----------------------------------------------------------------------
diff --git a/giraph-block-app/src/main/java/org/apache/giraph/function/primitive/IntSupplier.java b/giraph-block-app/src/main/java/org/apache/giraph/function/primitive/IntSupplier.java
new file mode 100644
index 0000000..2cf74e1
--- /dev/null
+++ b/giraph-block-app/src/main/java/org/apache/giraph/function/primitive/IntSupplier.java
@@ -0,0 +1,27 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.giraph.function.primitive;
+
+/**
+ * Primitive specialization of Function:
+ * () -> int
+ */
+public interface IntSupplier {
+ /** Retrieves an int value. */
+ int get();
+}
http://git-wip-us.apache.org/repos/asf/giraph/blob/819d6d38/giraph-block-app/src/main/java/org/apache/giraph/function/primitive/package-info.java
----------------------------------------------------------------------
diff --git a/giraph-block-app/src/main/java/org/apache/giraph/function/primitive/package-info.java b/giraph-block-app/src/main/java/org/apache/giraph/function/primitive/package-info.java
new file mode 100644
index 0000000..fdf40ff
--- /dev/null
+++ b/giraph-block-app/src/main/java/org/apache/giraph/function/primitive/package-info.java
@@ -0,0 +1,22 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+/**
+ * Primitive specializations of interfaces from org.apache.giraph.function
+ * package.
+ */
+package org.apache.giraph.function.primitive;
http://git-wip-us.apache.org/repos/asf/giraph/blob/819d6d38/giraph-block-app/src/main/java/org/apache/giraph/function/vertex/ConsumerWithVertex.java
----------------------------------------------------------------------
diff --git a/giraph-block-app/src/main/java/org/apache/giraph/function/vertex/ConsumerWithVertex.java b/giraph-block-app/src/main/java/org/apache/giraph/function/vertex/ConsumerWithVertex.java
new file mode 100644
index 0000000..87e7f9b
--- /dev/null
+++ b/giraph-block-app/src/main/java/org/apache/giraph/function/vertex/ConsumerWithVertex.java
@@ -0,0 +1,48 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.giraph.function.vertex;
+
+import java.io.Serializable;
+
+import org.apache.giraph.function.PairConsumer;
+import org.apache.giraph.graph.Vertex;
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.io.WritableComparable;
+
+
+/**
+ * Function:
+ * (vertex, T) -> void
+ *
+ * A class that can consume objects of a single type, when given a vertex.
+ *
+ * @param <I> Vertex id type
+ * @param <V> Vertex value type
+ * @param <E> Edge value type
+ * @param <T> Argument type
+ */
+@SuppressWarnings("rawtypes")
+public interface ConsumerWithVertex<I extends WritableComparable,
+ V extends Writable, E extends Writable, T>
+ extends PairConsumer<Vertex<I, V, E>, T>, Serializable {
+ /**
+ * Applies this function to {@code vertex} and {@code input}
+ */
+ @Override
+ void apply(Vertex<I, V, E> vertex, T value);
+}
http://git-wip-us.apache.org/repos/asf/giraph/blob/819d6d38/giraph-block-app/src/main/java/org/apache/giraph/function/vertex/FunctionWithVertex.java
----------------------------------------------------------------------
diff --git a/giraph-block-app/src/main/java/org/apache/giraph/function/vertex/FunctionWithVertex.java b/giraph-block-app/src/main/java/org/apache/giraph/function/vertex/FunctionWithVertex.java
new file mode 100644
index 0000000..fdab5de
--- /dev/null
+++ b/giraph-block-app/src/main/java/org/apache/giraph/function/vertex/FunctionWithVertex.java
@@ -0,0 +1,52 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.giraph.function.vertex;
+
+import java.io.Serializable;
+
+import org.apache.giraph.function.PairFunction;
+import org.apache.giraph.graph.Vertex;
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.io.WritableComparable;
+
+/**
+ * Function:
+ * (vertex, F) -> T
+ *
+ * Determines an output value based on a vertex and an input value.
+ *
+ * @param <I> Vertex id type
+ * @param <V> Vertex value type
+ * @param <E> Edge value type
+ * @param <F> Argument type
+ * @param <T> Result type
+ */
+@SuppressWarnings("rawtypes")
+public interface FunctionWithVertex<I extends WritableComparable,
+ V extends Writable, E extends Writable, F, T>
+ extends PairFunction<Vertex<I, V, E>, F, T>, Serializable {
+ /**
+ * Returns the result of applying this function to given
+ * {@code vertex} and {@code input}.
+ *
+ * The returned object may or may not be a new instance,
+ * depending on the implementation.
+ */
+ @Override
+ T apply(Vertex<I, V, E> vertex, F input);
+}
http://git-wip-us.apache.org/repos/asf/giraph/blob/819d6d38/giraph-block-app/src/main/java/org/apache/giraph/function/vertex/SupplierFromVertex.java
----------------------------------------------------------------------
diff --git a/giraph-block-app/src/main/java/org/apache/giraph/function/vertex/SupplierFromVertex.java b/giraph-block-app/src/main/java/org/apache/giraph/function/vertex/SupplierFromVertex.java
new file mode 100644
index 0000000..bc0f9c1
--- /dev/null
+++ b/giraph-block-app/src/main/java/org/apache/giraph/function/vertex/SupplierFromVertex.java
@@ -0,0 +1,49 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.giraph.function.vertex;
+
+import java.io.Serializable;
+
+import org.apache.giraph.graph.Vertex;
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.io.WritableComparable;
+
+/**
+ * Function:
+ * (vertex) -> T
+ *
+ * A class that can supply objects of a single type, when given a vertex.
+ *
+ * (doesn't extend Function<Vertex<I, V, E>, T>, because of different
+ * method names)
+ *
+ * @param <I> Vertex id type
+ * @param <V> Vertex value type
+ * @param <E> Edge value type
+ * @param <T> Result type
+ */
+@SuppressWarnings("rawtypes")
+public interface SupplierFromVertex<I extends WritableComparable,
+ V extends Writable, E extends Writable, T> extends Serializable {
+ /**
+ * Retrieves an instance of the appropriate type, given a vertex.
+ * The returned object may or may not be a new instance,
+ * depending on the implementation.
+ */
+ T get(Vertex<I, V, E> vertex);
+}
http://git-wip-us.apache.org/repos/asf/giraph/blob/819d6d38/giraph-block-app/src/main/java/org/apache/giraph/function/vertex/package-info.java
----------------------------------------------------------------------
diff --git a/giraph-block-app/src/main/java/org/apache/giraph/function/vertex/package-info.java b/giraph-block-app/src/main/java/org/apache/giraph/function/vertex/package-info.java
new file mode 100644
index 0000000..bd5b019
--- /dev/null
+++ b/giraph-block-app/src/main/java/org/apache/giraph/function/vertex/package-info.java
@@ -0,0 +1,22 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+/**
+ * Package for interfaces representing functions additionally
+ * performed on vertex values.
+ */
+package org.apache.giraph.function.vertex;
http://git-wip-us.apache.org/repos/asf/giraph/blob/819d6d38/giraph-block-app/src/test/java/org/apache/giraph/block_app/framework/block/BlockTestingUtils.java
----------------------------------------------------------------------
diff --git a/giraph-block-app/src/test/java/org/apache/giraph/block_app/framework/block/BlockTestingUtils.java b/giraph-block-app/src/test/java/org/apache/giraph/block_app/framework/block/BlockTestingUtils.java
new file mode 100644
index 0000000..6487d95
--- /dev/null
+++ b/giraph-block-app/src/test/java/org/apache/giraph/block_app/framework/block/BlockTestingUtils.java
@@ -0,0 +1,158 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.giraph.block_app.framework.block;
+
+import static org.junit.Assert.assertEquals;
+import it.unimi.dsi.fastutil.ints.IntArrayList;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.Iterator;
+import java.util.Random;
+
+import org.apache.giraph.block_app.framework.piece.AbstractPiece;
+
+import com.google.common.base.Preconditions;
+import com.google.common.collect.Iterables;
+
+@SuppressWarnings({"unchecked", "rawtypes"})
+public class BlockTestingUtils {
+
+ BlockTestingUtils() { }
+
+ private static final int NUM_TRIALS = 10;
+ private static final int REPEAT_TIMES = 10;
+
+ private static int testSequential(Iterator<? extends AbstractPiece> referenceImpl,
+ Iterator<? extends AbstractPiece> testImpl) {
+ int length = 0;
+
+ CheckIterator checkIterator = new CheckIterator(referenceImpl, testImpl);
+ while (checkIterator.hasNext()) {
+ checkIterator.next();
+ length++;
+ }
+
+ System.out.println("Length is : " + length);
+ return length;
+ }
+
+ private static boolean anyHasNext(ArrayList<? extends Iterator> arr) {
+ for (Iterator t : arr) {
+ if (t.hasNext()) {
+ return true;
+ }
+ }
+ return false;
+ }
+
+ private static void testRandom(int length,
+ Iterable<? extends AbstractPiece> referenceImpl,
+ Iterable<? extends AbstractPiece> testImpl) {
+ Random rand = new Random();
+
+ ArrayList<CheckIterator<AbstractPiece>> arr = new ArrayList<>();
+ IntArrayList lengths = new IntArrayList(NUM_TRIALS);
+ for (int i = 0; i < NUM_TRIALS; i++) {
+ lengths.add(0);
+ }
+ for (int i = 0; i < NUM_TRIALS; i++) {
+ arr.add(new CheckIterator(referenceImpl.iterator(), testImpl.iterator()));
+ }
+
+ int totalCount = 0;
+ while (anyHasNext(arr)) {
+ int index = rand.nextInt(NUM_TRIALS);
+ while (!arr.get(index).hasNext()) {
+ index = rand.nextInt(NUM_TRIALS);
+ }
+ CheckIterator it = arr.get(index);
+ it.next();
+ int itLength = lengths.getInt(index);
+ lengths.set(index, itLength + 1);
+ totalCount++;
+ }
+ assertEquals("TotalCount should be length * NUM_TRIALS", length * NUM_TRIALS, totalCount);
+ System.out.println("Final count is : " + totalCount);
+ }
+
+ /**
+ * Tests both the length of the iterator returned by the block, as-well as the deterministic behavior
+ * expected by calling .iterator() against the referenceImpl.
+ * @param referenceImpl : A list of pieces in the expected order
+ * @param testImpl : A list of pieces to test against (the Block)
+ */
+ public static void testIndependence(Iterable<? extends AbstractPiece> referenceImpl,
+ Iterable<? extends AbstractPiece> testImpl) {
+ int length = testSequential(referenceImpl.iterator(), testImpl.iterator());
+ testRandom(length, referenceImpl, testImpl);
+ }
+
+ /**
+ * Test how the block interacts with a repeatBlock. The expected result is to
+ * see the pieces in referenceImpl show up REPEAT_TIMES many times.
+ * @param referenceImpl : A list of pieces in the expected order
+ * @param block : The block to test
+ */
+ public static void testNestedRepeatBlock(Iterable<? extends AbstractPiece> referenceImpl, Block block) {
+ Block repeatBlock = new RepeatBlock(
+ REPEAT_TIMES,
+ block
+ );
+ testIndependence(
+ Iterables.concat(Collections.nCopies(REPEAT_TIMES, referenceImpl)),
+ repeatBlock
+ );
+ }
+
+ public static class CheckIterator<T> implements Iterator {
+
+ private final Iterator<T> fst;
+ private final Iterator<T> snd;
+
+ public CheckIterator(Iterator<T> fst, Iterator<T> snd) {
+ this.fst = fst;
+ this.snd = snd;
+ }
+
+ @Override
+ public boolean hasNext() {
+ boolean fstHasNxt = fst.hasNext();
+ boolean sndHasNxt = snd.hasNext();
+ Preconditions.checkArgument(fstHasNxt == sndHasNxt, "Expect hasNext() on " +
+ "both iterators to be identical. Got: " + fst.hasNext() + " and " + snd.hasNext());
+ return fstHasNxt;
+ }
+
+ @Override
+ public Object next() {
+ T fstNxt = fst.next();
+ T sndNxt = snd.next();
+ Preconditions.checkArgument(fstNxt == sndNxt, "Expect objs returned by " +
+ "both iterators to be identical. Got: " + fstNxt + " and " + sndNxt);
+ return fstNxt;
+ }
+
+ @Override
+ public void remove() {
+ throw new RuntimeException("Not implemented");
+ }
+
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/giraph/blob/819d6d38/giraph-block-app/src/test/java/org/apache/giraph/block_app/framework/block/TestIfBlock.java
----------------------------------------------------------------------
diff --git a/giraph-block-app/src/test/java/org/apache/giraph/block_app/framework/block/TestIfBlock.java b/giraph-block-app/src/test/java/org/apache/giraph/block_app/framework/block/TestIfBlock.java
new file mode 100644
index 0000000..0dacae1
--- /dev/null
+++ b/giraph-block-app/src/test/java/org/apache/giraph/block_app/framework/block/TestIfBlock.java
@@ -0,0 +1,88 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.giraph.block_app.framework.block;
+
+
+import java.util.Arrays;
+
+import org.apache.giraph.block_app.framework.piece.Piece;
+import org.apache.giraph.function.Supplier;
+import org.junit.Test;
+
+public class TestIfBlock {
+
+ private static final Supplier<Boolean> TRUE_SUPPLIER = new Supplier<Boolean>() {
+ @Override
+ public Boolean get() {
+ return true;
+ }
+ };
+
+ private static final Supplier<Boolean> FALSE_SUPPLIER = new Supplier<Boolean>() {
+ @Override
+ public Boolean get() {
+ return false;
+ }
+ };
+
+ @Test
+ // Test short-circuiting the if -> then
+ public void testIfBlockThen() throws Exception {
+ Piece piece1 = new Piece();
+ Piece piece2 = new Piece();
+ Block ifBlock = new IfBlock(
+ TRUE_SUPPLIER,
+ new SequenceBlock(piece1, piece2)
+ );
+
+ BlockTestingUtils.testIndependence(
+ Arrays.asList(piece1, piece2),
+ ifBlock);
+ }
+
+ @Test
+ // Test short-circuiting the if -> else
+ public void testIfBlockElse() throws Exception {
+ Piece piece1 = new Piece();
+ Piece piece2 = new Piece();
+ Block ifBlock = new IfBlock(
+ FALSE_SUPPLIER,
+ new EmptyBlock(),
+ new SequenceBlock(piece1, piece2)
+ );
+
+ BlockTestingUtils.testIndependence(
+ Arrays.asList(piece1, piece2),
+ ifBlock);
+ }
+
+ @Test
+ public void testIfNestedInRepeat() throws Exception {
+ Piece piece1 = new Piece();
+ Piece piece2 = new Piece();
+ Block ifBlock = new IfBlock(
+ TRUE_SUPPLIER,
+ new SequenceBlock(piece1, piece2)
+ );
+
+ BlockTestingUtils.testNestedRepeatBlock(
+ Arrays.asList(piece1, piece2),
+ ifBlock);
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/giraph/blob/819d6d38/giraph-block-app/src/test/java/org/apache/giraph/block_app/framework/block/TestRepeatBlock.java
----------------------------------------------------------------------
diff --git a/giraph-block-app/src/test/java/org/apache/giraph/block_app/framework/block/TestRepeatBlock.java b/giraph-block-app/src/test/java/org/apache/giraph/block_app/framework/block/TestRepeatBlock.java
new file mode 100644
index 0000000..1e096ba
--- /dev/null
+++ b/giraph-block-app/src/test/java/org/apache/giraph/block_app/framework/block/TestRepeatBlock.java
@@ -0,0 +1,80 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.giraph.block_app.framework.block;
+
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.List;
+
+import org.apache.giraph.block_app.framework.piece.AbstractPiece;
+import org.apache.giraph.block_app.framework.piece.Piece;
+import org.junit.Test;
+
+import com.google.common.collect.Iterables;
+
+/**
+ * Tests repeatBlock's correctness
+ */
+public class TestRepeatBlock {
+
+ public static final int REPEAT_TIMES = 5;
+
+ @Test
+ public void testRepeatBlockBasic() throws Exception {
+ Piece piece1 = new Piece();
+ Piece piece2 = new Piece();
+ Block innerBlock = new SequenceBlock(piece1, piece2);
+ Block repeatBlock = new RepeatBlock(
+ REPEAT_TIMES,
+ innerBlock
+ );
+ BlockTestingUtils.testIndependence(
+ Iterables.concat(Collections.nCopies(REPEAT_TIMES, Arrays.asList(piece1, piece2))),
+ repeatBlock);
+ }
+
+ @Test
+ public void testNestedRepeatBlock() throws Exception {
+ Piece piece1 = new Piece();
+ Piece piece2 = new Piece();
+ Block innerBlock = new SequenceBlock(piece1, piece2);
+ Block repeatBlock = new RepeatBlock(
+ REPEAT_TIMES,
+ innerBlock
+ );
+ BlockTestingUtils.testNestedRepeatBlock(
+ Iterables.concat(Collections.nCopies(REPEAT_TIMES, Arrays.asList(piece1, piece2))),
+ repeatBlock);
+ }
+
+ @Test
+ public void testRepeatBlockEmpty() throws Exception {
+ Block innerBlock = new EmptyBlock();
+ Block repeatBlock = new RepeatBlock(
+ REPEAT_TIMES,
+ innerBlock
+ );
+ List<? extends AbstractPiece> referenceImpl = Collections.emptyList();
+ BlockTestingUtils.testIndependence(
+ // Concatenating EmptyIterator = just EmptyIterator. No obj's to
+ // compare against either
+ referenceImpl,
+ repeatBlock);
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/giraph/blob/819d6d38/giraph-block-app/src/test/java/org/apache/giraph/block_app/framework/block/TestRepeatUntilBlock.java
----------------------------------------------------------------------
diff --git a/giraph-block-app/src/test/java/org/apache/giraph/block_app/framework/block/TestRepeatUntilBlock.java b/giraph-block-app/src/test/java/org/apache/giraph/block_app/framework/block/TestRepeatUntilBlock.java
new file mode 100644
index 0000000..242d376
--- /dev/null
+++ b/giraph-block-app/src/test/java/org/apache/giraph/block_app/framework/block/TestRepeatUntilBlock.java
@@ -0,0 +1,103 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.giraph.block_app.framework.block;
+
+import static org.junit.Assert.assertEquals;
+
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.Iterator;
+
+import org.apache.giraph.block_app.framework.piece.AbstractPiece;
+import org.apache.giraph.block_app.framework.piece.Piece;
+import org.apache.giraph.function.Supplier;
+import org.junit.Test;
+
+import com.google.common.collect.Iterables;
+
+/**
+ * Tests repeatUntilBlock's correctness
+ */
+public class TestRepeatUntilBlock {
+
+ public static final int REPEAT_TIMES = 5;
+
+ private static final Supplier<Boolean> falseSupplier = new Supplier<Boolean>() {
+ @Override
+ public Boolean get() {
+ return false;
+ }
+ };
+
+ @Test
+ public void testRepeatUntilBlockBasic() throws Exception {
+ Piece piece1 = new Piece();
+ Piece piece2 = new Piece();
+ Block innerBlock = new SequenceBlock(piece1, piece2);
+ Block repeatBlock = new RepeatUntilBlock(
+ REPEAT_TIMES,
+ innerBlock,
+ falseSupplier
+ );
+ BlockTestingUtils.testIndependence(
+ Iterables.concat(Collections.nCopies(REPEAT_TIMES, Arrays.asList(piece1, piece2))),
+ repeatBlock);
+ }
+
+ @Test
+ public void testNestedRepeatUntilBlock() throws Exception {
+ Piece piece1 = new Piece();
+ Piece piece2 = new Piece();
+ Block innerBlock = new SequenceBlock(piece1, piece2);
+ Block repeatBlock = new RepeatUntilBlock(
+ REPEAT_TIMES,
+ innerBlock,
+ falseSupplier
+ );
+ BlockTestingUtils.testNestedRepeatBlock(
+ Iterables.concat(Collections.nCopies(REPEAT_TIMES, Arrays.asList(piece1, piece2))),
+ repeatBlock);
+ }
+
+ @Test
+ public void testRepeatUntilBlockUnlimited() throws Exception {
+ Block innerBlock = new SequenceBlock(new Piece());
+ // Can't test with testIndependence - spin up our own test inline
+ Supplier<Boolean> countingSupplier = new Supplier<Boolean>() {
+ private int i = 0;
+
+ @Override
+ public Boolean get() {
+ i++;
+ return i > REPEAT_TIMES;
+ }
+ };
+ Block repeatBlock = RepeatUntilBlock.unlimited(
+ innerBlock,
+ countingSupplier
+ );
+ int count = 0;
+ Iterator<AbstractPiece> it = repeatBlock.iterator();
+ while (it.hasNext()) {
+ it.next();
+ count++;
+ }
+ assertEquals("Count must be equal to REPEAT_TIMES", REPEAT_TIMES, count);
+ }
+
+}