You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@giraph.apache.org by ni...@apache.org on 2013/01/04 21:52:39 UTC
[4/23] GIRAPH-409: Refactor / cleanups (nitay)
http://git-wip-us.apache.org/repos/asf/giraph/blob/1684891e/giraph-core/src/main/java/org/apache/giraph/worker/InputSplitsCallable.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/worker/InputSplitsCallable.java b/giraph-core/src/main/java/org/apache/giraph/worker/InputSplitsCallable.java
new file mode 100644
index 0000000..ec4780e
--- /dev/null
+++ b/giraph-core/src/main/java/org/apache/giraph/worker/InputSplitsCallable.java
@@ -0,0 +1,392 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.giraph.worker;
+
+import org.apache.giraph.conf.ImmutableClassesGiraphConfiguration;
+import org.apache.giraph.comm.WorkerClientRequestProcessor;
+import org.apache.giraph.comm.netty.NettyWorkerClientRequestProcessor;
+import org.apache.giraph.graph.GraphState;
+import org.apache.giraph.graph.InputSplitEvents;
+import org.apache.giraph.graph.VertexEdgeCount;
+import org.apache.giraph.time.SystemTime;
+import org.apache.giraph.time.Time;
+import org.apache.giraph.time.Times;
+import org.apache.giraph.zk.ZooKeeperExt;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.io.WritableComparable;
+import org.apache.hadoop.mapreduce.InputSplit;
+import org.apache.hadoop.mapreduce.Mapper;
+import org.apache.hadoop.util.ReflectionUtils;
+import org.apache.log4j.Logger;
+import org.apache.zookeeper.CreateMode;
+import org.apache.zookeeper.KeeperException;
+import org.apache.zookeeper.ZooDefs;
+import org.apache.zookeeper.data.Stat;
+
+import java.io.ByteArrayInputStream;
+import java.io.DataInputStream;
+import java.io.IOException;
+import java.util.List;
+import java.util.concurrent.Callable;
+
+/**
+ * Abstract base class for loading vertex/edge input splits.
+ * Every thread will has its own instance of WorkerClientRequestProcessor
+ * to send requests.
+ *
+ * @param <I> Vertex index value
+ * @param <V> Vertex value
+ * @param <E> Edge value
+ * @param <M> Message data
+ */
+public abstract class InputSplitsCallable<I extends WritableComparable,
+ V extends Writable, E extends Writable, M extends Writable>
+ implements Callable<VertexEdgeCount> {
+ /** Name of counter for vertices loaded */
+ public static final String COUNTER_VERTICES_LOADED = "vertices-loaded";
+ /** Name of counter for edges loaded */
+ public static final String COUNTER_EDGES_LOADED = "edges-loaded";
+ /** Class logger */
+ private static final Logger LOG = Logger.getLogger(InputSplitsCallable.class);
+ /** Class time object */
+ private static final Time TIME = SystemTime.get();
+ /** Configuration */
+ protected final ImmutableClassesGiraphConfiguration<I, V, E, M>
+ configuration;
+ /** Context */
+ protected final Mapper<?, ?, ?, ?>.Context context;
+ /** Graph state */
+ private final GraphState<I, V, E, M> graphState;
+ /** Handles IPC communication */
+ private final WorkerClientRequestProcessor<I, V, E, M>
+ workerClientRequestProcessor;
+ /**
+ * Stores and processes the list of InputSplits advertised
+ * in a tree of child znodes by the master.
+ */
+ private final InputSplitPathOrganizer splitOrganizer;
+ /** ZooKeeperExt handle */
+ private final ZooKeeperExt zooKeeperExt;
+ /** Get the start time in nanos */
+ private final long startNanos = TIME.getNanoseconds();
+ /** ZooKeeper input split reserved node. */
+ private final String inputSplitReservedNode;
+ /** ZooKeeper input split finished node. */
+ private final String inputSplitFinishedNode;
+ /** Input split events. */
+ private final InputSplitEvents inputSplitEvents;
+
+ // CHECKSTYLE: stop ParameterNumberCheck
+ /**
+ * Constructor.
+ *
+ * @param context Context
+ * @param graphState Graph state
+ * @param configuration Configuration
+ * @param bspServiceWorker service worker
+ * @param inputSplitPathList List of the paths of the input splits
+ * @param workerInfo This worker's info
+ * @param zooKeeperExt Handle to ZooKeeperExt
+ * @param inputSplitReservedNode Path to input split reserved
+ * @param inputSplitFinishedNode Path to input split finsished
+ * @param inputSplitEvents Input split events
+ */
+ public InputSplitsCallable(
+ Mapper<?, ?, ?, ?>.Context context,
+ GraphState<I, V, E, M> graphState,
+ ImmutableClassesGiraphConfiguration<I, V, E, M> configuration,
+ BspServiceWorker<I, V, E, M> bspServiceWorker,
+ List<String> inputSplitPathList,
+ WorkerInfo workerInfo,
+ ZooKeeperExt zooKeeperExt,
+ String inputSplitReservedNode,
+ String inputSplitFinishedNode,
+ InputSplitEvents inputSplitEvents) {
+ this.zooKeeperExt = zooKeeperExt;
+ this.context = context;
+ this.workerClientRequestProcessor =
+ new NettyWorkerClientRequestProcessor<I, V, E, M>(
+ context, configuration, bspServiceWorker);
+ this.graphState = new GraphState<I, V, E, M>(graphState.getSuperstep(),
+ graphState.getTotalNumVertices(), graphState.getTotalNumEdges(),
+ context, graphState.getGraphMapper(), workerClientRequestProcessor,
+ null);
+ try {
+ splitOrganizer = new InputSplitPathOrganizer(zooKeeperExt,
+ inputSplitPathList, workerInfo.getHostname(), workerInfo.getPort());
+ } catch (KeeperException e) {
+ throw new IllegalStateException(
+ "InputSplitsCallable: KeeperException", e);
+ } catch (InterruptedException e) {
+ throw new IllegalStateException(
+ "InputSplitsCallable: InterruptedException", e);
+ }
+ this.configuration = configuration;
+ this.inputSplitReservedNode = inputSplitReservedNode;
+ this.inputSplitFinishedNode = inputSplitFinishedNode;
+ this.inputSplitEvents = inputSplitEvents;
+ }
+ // CHECKSTYLE: resume ParameterNumberCheck
+
+ /**
+ * Load vertices/edges from the given input split.
+ *
+ * @param inputSplit Input split to load
+ * @param graphState Graph state
+ * @return Count of vertices and edges loaded
+ * @throws IOException
+ * @throws InterruptedException
+ */
+ protected abstract VertexEdgeCount readInputSplit(
+ InputSplit inputSplit,
+ GraphState<I, V, E, M> graphState)
+ throws IOException, InterruptedException;
+
+ @Override
+ public VertexEdgeCount call() {
+ VertexEdgeCount vertexEdgeCount = new VertexEdgeCount();
+ String inputSplitPath;
+ int inputSplitsProcessed = 0;
+ try {
+ while ((inputSplitPath = reserveInputSplit()) != null) {
+ vertexEdgeCount = vertexEdgeCount.incrVertexEdgeCount(
+ loadInputSplit(inputSplitPath,
+ graphState));
+ context.progress();
+ ++inputSplitsProcessed;
+ }
+ } catch (KeeperException e) {
+ throw new IllegalStateException("call: KeeperException", e);
+ } catch (InterruptedException e) {
+ throw new IllegalStateException("call: InterruptedException", e);
+ } catch (IOException e) {
+ throw new IllegalStateException("call: IOException", e);
+ } catch (ClassNotFoundException e) {
+ throw new IllegalStateException("call: ClassNotFoundException", e);
+ } catch (InstantiationException e) {
+ throw new IllegalStateException("call: InstantiationException", e);
+ } catch (IllegalAccessException e) {
+ throw new IllegalStateException("call: IllegalAccessException", e);
+ }
+
+ if (LOG.isInfoEnabled()) {
+ float seconds = Times.getNanosSince(TIME, startNanos) /
+ Time.NS_PER_SECOND_AS_FLOAT;
+ float verticesPerSecond = vertexEdgeCount.getVertexCount() / seconds;
+ float edgesPerSecond = vertexEdgeCount.getEdgeCount() / seconds;
+ LOG.info("call: Loaded " + inputSplitsProcessed + " " +
+ "input splits in " + seconds + " secs, " + vertexEdgeCount +
+ " " + verticesPerSecond + " vertices/sec, " +
+ edgesPerSecond + " edges/sec");
+ }
+ try {
+ workerClientRequestProcessor.flush();
+ } catch (IOException e) {
+ throw new IllegalStateException("call: Flushing failed.", e);
+ }
+ return vertexEdgeCount;
+ }
+
+ /**
+ * Try to reserve an InputSplit for loading. While InputSplits exists that
+ * are not finished, wait until they are.
+ *
+ * NOTE: iterations on the InputSplit list only halt for each worker when it
+ * has scanned the entire list once and found every split marked RESERVED.
+ * When a worker fails, its Ephemeral RESERVED znodes will disappear,
+ * allowing other iterating workers to claim it's previously read splits.
+ * Only when the last worker left iterating on the list fails can a danger
+ * of data loss occur. Since worker failure in INPUT_SUPERSTEP currently
+ * causes job failure, this is OK. As the failure model evolves, this
+ * behavior might need to change.
+ *
+ * @return reserved InputSplit or null if no unfinished InputSplits exist
+ * @throws org.apache.zookeeper.KeeperException
+ * @throws InterruptedException
+ */
+ private String reserveInputSplit()
+ throws KeeperException, InterruptedException {
+ String reservedInputSplitPath = null;
+ Stat reservedStat;
+ while (true) {
+ int reservedInputSplits = 0;
+ for (String nextSplitToClaim : splitOrganizer) {
+ context.progress();
+ String tmpInputSplitReservedPath = nextSplitToClaim +
+ inputSplitReservedNode;
+ reservedStat =
+ zooKeeperExt.exists(tmpInputSplitReservedPath, true);
+ if (reservedStat == null) {
+ try {
+ // Attempt to reserve this InputSplit
+ zooKeeperExt.createExt(tmpInputSplitReservedPath,
+ null,
+ ZooDefs.Ids.OPEN_ACL_UNSAFE,
+ CreateMode.EPHEMERAL,
+ false);
+ reservedInputSplitPath = nextSplitToClaim;
+ if (LOG.isInfoEnabled()) {
+ float percentFinished =
+ reservedInputSplits * 100.0f /
+ splitOrganizer.getPathListSize();
+ LOG.info("reserveInputSplit: Reserved input " +
+ "split path " + reservedInputSplitPath +
+ ", overall roughly " +
+ + percentFinished +
+ "% input splits reserved");
+ }
+ return reservedInputSplitPath;
+ } catch (KeeperException.NodeExistsException e) {
+ LOG.info("reserveInputSplit: Couldn't reserve " +
+ "(already reserved) inputSplit" +
+ " at " + tmpInputSplitReservedPath);
+ } catch (KeeperException e) {
+ throw new IllegalStateException(
+ "reserveInputSplit: KeeperException on reserve", e);
+ } catch (InterruptedException e) {
+ throw new IllegalStateException(
+ "reserveInputSplit: InterruptedException " +
+ "on reserve", e);
+ }
+ } else {
+ ++reservedInputSplits;
+ }
+ }
+ if (LOG.isInfoEnabled()) {
+ LOG.info("reserveInputSplit: reservedPath = " +
+ reservedInputSplitPath + ", " + reservedInputSplits +
+ " of " + splitOrganizer.getPathListSize() +
+ " InputSplits are finished.");
+ }
+ if (reservedInputSplits == splitOrganizer.getPathListSize()) {
+ return null;
+ }
+ context.progress();
+ // Wait for either a reservation to go away or a notification that
+ // an InputSplit has finished.
+ context.progress();
+ inputSplitEvents.getStateChanged().waitMsecs(
+ 60 * 1000);
+ inputSplitEvents.getStateChanged().reset();
+ }
+ }
+
+ /**
+ * Mark an input split path as completed by this worker. This notifies
+ * the master and the other workers that this input split has not only
+ * been reserved, but also marked processed.
+ *
+ * @param inputSplitPath Path to the input split.
+ */
+ private void markInputSplitPathFinished(String inputSplitPath) {
+ String inputSplitFinishedPath =
+ inputSplitPath + inputSplitFinishedNode;
+ try {
+ zooKeeperExt.createExt(inputSplitFinishedPath,
+ null,
+ ZooDefs.Ids.OPEN_ACL_UNSAFE,
+ CreateMode.PERSISTENT,
+ true);
+ } catch (KeeperException.NodeExistsException e) {
+ LOG.warn("markInputSplitPathFinished: " + inputSplitFinishedPath +
+ " already exists!");
+ } catch (KeeperException e) {
+ throw new IllegalStateException(
+ "markInputSplitPathFinished: KeeperException on " +
+ inputSplitFinishedPath, e);
+ } catch (InterruptedException e) {
+ throw new IllegalStateException(
+ "markInputSplitPathFinished: InterruptedException on " +
+ inputSplitFinishedPath, e);
+ }
+ }
+
+ /**
+ * Extract vertices from input split, saving them into a mini cache of
+ * partitions. Periodically flush the cache of vertices when a limit is
+ * reached in readVerticeFromInputSplit.
+ * Mark the input split finished when done.
+ *
+ * @param inputSplitPath ZK location of input split
+ * @param graphState Current graph state
+ * @return Mapping of vertex indices and statistics, or null if no data read
+ * @throws IOException
+ * @throws ClassNotFoundException
+ * @throws InterruptedException
+ * @throws InstantiationException
+ * @throws IllegalAccessException
+ */
+ private VertexEdgeCount loadInputSplit(
+ String inputSplitPath,
+ GraphState<I, V, E, M> graphState)
+ throws IOException, ClassNotFoundException, InterruptedException,
+ InstantiationException, IllegalAccessException {
+ InputSplit inputSplit = getInputSplit(inputSplitPath);
+ VertexEdgeCount vertexEdgeCount =
+ readInputSplit(inputSplit, graphState);
+ if (LOG.isInfoEnabled()) {
+ LOG.info("loadFromInputSplit: Finished loading " +
+ inputSplitPath + " " + vertexEdgeCount);
+ }
+ markInputSplitPathFinished(inputSplitPath);
+ return vertexEdgeCount;
+ }
+
+ /**
+ * Talk to ZooKeeper to convert the input split path to the actual
+ * InputSplit.
+ *
+ * @param inputSplitPath Location in ZK of input split
+ * @return instance of InputSplit
+ * @throws IOException
+ * @throws ClassNotFoundException
+ */
+ protected InputSplit getInputSplit(String inputSplitPath)
+ throws IOException, ClassNotFoundException {
+ byte[] splitList;
+ try {
+ splitList = zooKeeperExt.getData(inputSplitPath, false, null);
+ } catch (KeeperException e) {
+ throw new IllegalStateException(
+ "getInputSplit: KeeperException on " + inputSplitPath, e);
+ } catch (InterruptedException e) {
+ throw new IllegalStateException(
+ "getInputSplit: IllegalStateException on " + inputSplitPath, e);
+ }
+ context.progress();
+
+ DataInputStream inputStream =
+ new DataInputStream(new ByteArrayInputStream(splitList));
+ Text.readString(inputStream); // location data unused here, skip
+ String inputSplitClass = Text.readString(inputStream);
+ InputSplit inputSplit = (InputSplit)
+ ReflectionUtils.newInstance(
+ configuration.getClassByName(inputSplitClass),
+ configuration);
+ ((Writable) inputSplit).readFields(inputStream);
+
+ if (LOG.isInfoEnabled()) {
+ LOG.info("getInputSplit: Reserved " + inputSplitPath +
+ " from ZooKeeper and got input split '" +
+ inputSplit.toString() + "'");
+ }
+ return inputSplit;
+ }
+}
http://git-wip-us.apache.org/repos/asf/giraph/blob/1684891e/giraph-core/src/main/java/org/apache/giraph/worker/InputSplitsCallableFactory.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/worker/InputSplitsCallableFactory.java b/giraph-core/src/main/java/org/apache/giraph/worker/InputSplitsCallableFactory.java
new file mode 100644
index 0000000..9e8bc32
--- /dev/null
+++ b/giraph-core/src/main/java/org/apache/giraph/worker/InputSplitsCallableFactory.java
@@ -0,0 +1,40 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.giraph.worker;
+
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.io.WritableComparable;
+
+/**
+ * Factory class for creating {@link InputSplitsCallable}s.
+ *
+ * @param <I> Vertex id
+ * @param <V> Vertex value
+ * @param <E> Edge value
+ * @param <M> Message data
+ */
+public interface InputSplitsCallableFactory<I extends WritableComparable,
+ V extends Writable, E extends Writable, M extends Writable> {
+ /**
+ * Return a newly-created {@link InputSplitsCallable}.
+ *
+ * @return A new {@link InputSplitsCallable}
+ */
+ InputSplitsCallable<I, V, E, M> newCallable();
+}
http://git-wip-us.apache.org/repos/asf/giraph/blob/1684891e/giraph-core/src/main/java/org/apache/giraph/worker/VertexInputSplitsCallable.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/worker/VertexInputSplitsCallable.java b/giraph-core/src/main/java/org/apache/giraph/worker/VertexInputSplitsCallable.java
new file mode 100644
index 0000000..83c8b41
--- /dev/null
+++ b/giraph-core/src/main/java/org/apache/giraph/worker/VertexInputSplitsCallable.java
@@ -0,0 +1,185 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.giraph.worker;
+
+import org.apache.giraph.conf.ImmutableClassesGiraphConfiguration;
+import org.apache.giraph.graph.GraphState;
+import org.apache.giraph.vertex.Vertex;
+import org.apache.giraph.graph.VertexEdgeCount;
+import org.apache.giraph.io.VertexInputFormat;
+import org.apache.giraph.io.VertexReader;
+import org.apache.giraph.partition.PartitionOwner;
+import org.apache.giraph.metrics.GiraphMetrics;
+import org.apache.giraph.metrics.GiraphMetricsRegistry;
+import org.apache.giraph.utils.LoggerUtils;
+import org.apache.giraph.utils.MemoryUtils;
+import org.apache.giraph.zk.ZooKeeperExt;
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.io.WritableComparable;
+import org.apache.hadoop.mapreduce.InputSplit;
+import org.apache.hadoop.mapreduce.Mapper;
+import org.apache.log4j.Level;
+import org.apache.log4j.Logger;
+
+import com.yammer.metrics.core.Counter;
+
+import java.io.IOException;
+import java.util.List;
+
+/**
+ * Load as many vertex input splits as possible.
+ * Every thread will has its own instance of WorkerClientRequestProcessor
+ * to send requests.
+ *
+ * @param <I> Vertex index value
+ * @param <V> Vertex value
+ * @param <E> Edge value
+ * @param <M> Message data
+ */
+public class VertexInputSplitsCallable<I extends WritableComparable,
+ V extends Writable, E extends Writable, M extends Writable>
+ extends InputSplitsCallable<I, V, E, M> {
+ /** Class logger */
+ private static final Logger LOG =
+ Logger.getLogger(VertexInputSplitsCallable.class);
+ /** Total vertices loaded */
+ private long totalVerticesLoaded = 0;
+ /** Total edges loaded */
+ private long totalEdgesLoaded = 0;
+ /** Input split max vertices (-1 denotes all) */
+ private final long inputSplitMaxVertices;
+ /** Bsp service worker (only use thread-safe methods) */
+ private final BspServiceWorker<I, V, E, M> bspServiceWorker;
+
+ // Metrics
+ /** number of vertices loaded counter */
+ private final Counter verticesLoadedCounter;
+ /** number of edges loaded counter */
+ private final Counter edgesLoadedCounter;
+
+ /**
+ * Constructor.
+ *
+ * @param context Context
+ * @param graphState Graph state
+ * @param configuration Configuration
+ * @param bspServiceWorker service worker
+ * @param inputSplitPathList List of the paths of the input splits
+ * @param workerInfo This worker's info
+ * @param zooKeeperExt Handle to ZooKeeperExt
+ */
+ public VertexInputSplitsCallable(
+ Mapper<?, ?, ?, ?>.Context context,
+ GraphState<I, V, E, M> graphState,
+ ImmutableClassesGiraphConfiguration<I, V, E, M> configuration,
+ BspServiceWorker<I, V, E, M> bspServiceWorker,
+ List<String> inputSplitPathList,
+ WorkerInfo workerInfo,
+ ZooKeeperExt zooKeeperExt) {
+ super(context, graphState, configuration, bspServiceWorker,
+ inputSplitPathList, workerInfo, zooKeeperExt,
+ BspServiceWorker.VERTEX_INPUT_SPLIT_RESERVED_NODE,
+ BspServiceWorker.VERTEX_INPUT_SPLIT_FINISHED_NODE,
+ bspServiceWorker.getVertexInputSplitsEvents());
+
+ inputSplitMaxVertices = configuration.getInputSplitMaxVertices();
+ this.bspServiceWorker = bspServiceWorker;
+
+ // Initialize Metrics
+ GiraphMetricsRegistry jobMetrics = GiraphMetrics.get().perJob();
+ verticesLoadedCounter = jobMetrics.getCounter(COUNTER_VERTICES_LOADED);
+ edgesLoadedCounter = jobMetrics.getCounter(COUNTER_EDGES_LOADED);
+ }
+
+ /**
+ * Read vertices from input split. If testing, the user may request a
+ * maximum number of vertices to be read from an input split.
+ *
+ * @param inputSplit Input split to process with vertex reader
+ * @param graphState Current graph state
+ * @return Vertices and edges loaded from this input split
+ * @throws IOException
+ * @throws InterruptedException
+ */
+ @Override
+ protected VertexEdgeCount readInputSplit(
+ InputSplit inputSplit,
+ GraphState<I, V, E, M> graphState)
+ throws IOException, InterruptedException {
+ VertexInputFormat<I, V, E, M> vertexInputFormat =
+ configuration.createVertexInputFormat();
+ VertexReader<I, V, E, M> vertexReader =
+ vertexInputFormat.createVertexReader(inputSplit, context);
+ vertexReader.initialize(inputSplit, context);
+ long inputSplitVerticesLoaded = 0;
+ long inputSplitEdgesLoaded = 0;
+ while (vertexReader.nextVertex()) {
+ Vertex<I, V, E, M> readerVertex =
+ vertexReader.getCurrentVertex();
+ if (readerVertex.getId() == null) {
+ throw new IllegalArgumentException(
+ "readInputSplit: Vertex reader returned a vertex " +
+ "without an id! - " + readerVertex);
+ }
+ if (readerVertex.getValue() == null) {
+ readerVertex.setValue(configuration.createVertexValue());
+ }
+ readerVertex.setConf(configuration);
+ readerVertex.setGraphState(graphState);
+
+ PartitionOwner partitionOwner =
+ bspServiceWorker.getVertexPartitionOwner(readerVertex.getId());
+ graphState.getWorkerClientRequestProcessor().sendVertexRequest(
+ partitionOwner, readerVertex);
+ context.progress(); // do this before potential data transfer
+ ++inputSplitVerticesLoaded;
+ inputSplitEdgesLoaded += readerVertex.getNumEdges();
+
+ // Update status every 250k vertices
+ if (((inputSplitVerticesLoaded + totalVerticesLoaded) % 250000) == 0) {
+ LoggerUtils.setStatusAndLog(context, LOG, Level.INFO,
+ "readInputSplit: Loaded " +
+ (inputSplitVerticesLoaded + totalVerticesLoaded) +
+ " vertices " +
+ (inputSplitEdgesLoaded + totalEdgesLoaded) + " edges " +
+ MemoryUtils.getRuntimeMemoryStats());
+ }
+
+ // For sampling, or to limit outlier input splits, the number of
+ // records per input split can be limited
+ if (inputSplitMaxVertices > 0 &&
+ inputSplitVerticesLoaded >= inputSplitMaxVertices) {
+ if (LOG.isInfoEnabled()) {
+ LOG.info("readInputSplit: Leaving the input " +
+ "split early, reached maximum vertices " +
+ inputSplitVerticesLoaded);
+ }
+ break;
+ }
+ }
+ vertexReader.close();
+ totalVerticesLoaded += inputSplitVerticesLoaded;
+ verticesLoadedCounter.inc(inputSplitVerticesLoaded);
+ totalEdgesLoaded += inputSplitEdgesLoaded;
+ edgesLoadedCounter.inc(inputSplitEdgesLoaded);
+ return new VertexEdgeCount(
+ inputSplitVerticesLoaded, inputSplitEdgesLoaded);
+ }
+}
+
http://git-wip-us.apache.org/repos/asf/giraph/blob/1684891e/giraph-core/src/main/java/org/apache/giraph/worker/VertexInputSplitsCallableFactory.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/worker/VertexInputSplitsCallableFactory.java b/giraph-core/src/main/java/org/apache/giraph/worker/VertexInputSplitsCallableFactory.java
new file mode 100644
index 0000000..4bec931
--- /dev/null
+++ b/giraph-core/src/main/java/org/apache/giraph/worker/VertexInputSplitsCallableFactory.java
@@ -0,0 +1,95 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.giraph.worker;
+
+import org.apache.giraph.conf.ImmutableClassesGiraphConfiguration;
+import org.apache.giraph.graph.GraphState;
+import org.apache.giraph.zk.ZooKeeperExt;
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.io.WritableComparable;
+import org.apache.hadoop.mapreduce.Mapper;
+
+import java.util.List;
+
+/**
+ * Factory for {@link VertexInputSplitsCallable}s.
+ *
+ * @param <I> Vertex id
+ * @param <V> Vertex value
+ * @param <E> Edge value
+ * @param <M> Message data
+ */
+public class VertexInputSplitsCallableFactory<I extends WritableComparable,
+ V extends Writable, E extends Writable, M extends Writable>
+ implements InputSplitsCallableFactory<I, V, E, M> {
+ /** Mapper context. */
+ private final Mapper<?, ?, ?, ?>.Context context;
+ /** Graph state. */
+ private final GraphState<I, V, E, M> graphState;
+ /** Configuration. */
+ private final ImmutableClassesGiraphConfiguration<I, V, E, M> configuration;
+ /** {@link BspServiceWorker} we're running on. */
+ private final BspServiceWorker<I, V, E, M> bspServiceWorker;
+ /** List of input split paths. */
+ private final List<String> inputSplitPathList;
+ /** Worker info. */
+ private final WorkerInfo workerInfo;
+ /** {@link ZooKeeperExt} for this worker. */
+ private final ZooKeeperExt zooKeeperExt;
+
+ /**
+ * Constructor.
+ *
+ * @param context Mapper context
+ * @param graphState Graph state
+ * @param configuration Configuration
+ * @param bspServiceWorker Calling {@link BspServiceWorker}
+ * @param inputSplitPathList List of input split paths
+ * @param workerInfo Worker info
+ * @param zooKeeperExt {@link ZooKeeperExt} for this worker
+ */
+ public VertexInputSplitsCallableFactory(
+ Mapper<?, ?, ?, ?>.Context context,
+ GraphState<I, V, E, M> graphState,
+ ImmutableClassesGiraphConfiguration<I, V, E, M> configuration,
+ BspServiceWorker<I, V, E, M> bspServiceWorker,
+ List<String> inputSplitPathList,
+ WorkerInfo workerInfo,
+ ZooKeeperExt zooKeeperExt) {
+ this.context = context;
+ this.graphState = graphState;
+ this.configuration = configuration;
+ this.bspServiceWorker = bspServiceWorker;
+ this.inputSplitPathList = inputSplitPathList;
+ this.workerInfo = workerInfo;
+ this.zooKeeperExt = zooKeeperExt;
+ }
+
+ @Override
+ public InputSplitsCallable<I, V, E, M> newCallable() {
+ return new VertexInputSplitsCallable<I, V, E, M>(
+ context,
+ graphState,
+ configuration,
+ bspServiceWorker,
+ inputSplitPathList,
+ workerInfo,
+ zooKeeperExt);
+ }
+}
http://git-wip-us.apache.org/repos/asf/giraph/blob/1684891e/giraph-core/src/main/java/org/apache/giraph/worker/WorkerAggregatorHandler.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/worker/WorkerAggregatorHandler.java b/giraph-core/src/main/java/org/apache/giraph/worker/WorkerAggregatorHandler.java
new file mode 100644
index 0000000..cafd17b
--- /dev/null
+++ b/giraph-core/src/main/java/org/apache/giraph/worker/WorkerAggregatorHandler.java
@@ -0,0 +1,295 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.giraph.worker;
+
+import org.apache.giraph.conf.ImmutableClassesGiraphConfiguration;
+import org.apache.giraph.bsp.CentralizedServiceWorker;
+import org.apache.giraph.comm.aggregators.WorkerAggregatorRequestProcessor;
+import org.apache.giraph.comm.aggregators.AggregatedValueOutputStream;
+import org.apache.giraph.comm.aggregators.AggregatorUtils;
+import org.apache.giraph.comm.aggregators.AllAggregatorServerData;
+import org.apache.giraph.comm.aggregators.OwnerAggregatorServerData;
+import org.apache.giraph.aggregators.Aggregator;
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.util.Progressable;
+import org.apache.log4j.Logger;
+
+import com.google.common.collect.Maps;
+
+import java.io.IOException;
+import java.util.Map;
+
+/**
+ * Handler for aggregators on worker. Provides the aggregated values and
+ * performs aggregations from user vertex code (thread-safe). Also has
+ * methods for all superstep coordination related to aggregators.
+ *
+ * At the beginning of any superstep any worker calls prepareSuperstep(),
+ * which blocks until the final aggregates from the previous superstep have
+ * been delivered to the worker.
+ * Next, during the superstep worker can call aggregate() and
+ * getAggregatedValue() (both methods are thread safe) the former
+ * computes partial aggregates for this superstep from the worker,
+ * the latter returns (read-only) final aggregates from the previous superstep.
+ * Finally, at the end of the superstep, the worker calls finishSuperstep(),
+ * which propagates non-owned partial aggregates to the owner workers,
+ * and sends the final aggregate from the owner worker to the master.
+ */
+public class WorkerAggregatorHandler implements WorkerThreadAggregatorUsage {
+ /** Class logger */
+ private static final Logger LOG =
+ Logger.getLogger(WorkerAggregatorHandler.class);
+ /** Map of values from previous superstep */
+ private Map<String, Writable> previousAggregatedValueMap =
+ Maps.newHashMap();
+ /** Map of aggregators for current superstep */
+ private Map<String, Aggregator<Writable>> currentAggregatorMap =
+ Maps.newHashMap();
+ /** Service worker */
+ private final CentralizedServiceWorker<?, ?, ?, ?> serviceWorker;
+ /** Progressable for reporting progress */
+ private final Progressable progressable;
+ /** How big a single aggregator request can be */
+ private final int maxBytesPerAggregatorRequest;
+ /** Giraph configuration */
+ private final ImmutableClassesGiraphConfiguration conf;
+
+ /**
+ * Constructor
+ *
+ * @param serviceWorker Service worker
+ * @param conf Giraph configuration
+ * @param progressable Progressable for reporting progress
+ */
+ public WorkerAggregatorHandler(
+ CentralizedServiceWorker<?, ?, ?, ?> serviceWorker,
+ ImmutableClassesGiraphConfiguration conf,
+ Progressable progressable) {
+ this.serviceWorker = serviceWorker;
+ this.progressable = progressable;
+ this.conf = conf;
+ maxBytesPerAggregatorRequest = conf.getInt(
+ AggregatorUtils.MAX_BYTES_PER_AGGREGATOR_REQUEST,
+ AggregatorUtils.MAX_BYTES_PER_AGGREGATOR_REQUEST_DEFAULT);
+ }
+
+ @Override
+ public <A extends Writable> void aggregate(String name, A value) {
+ Aggregator<Writable> aggregator = currentAggregatorMap.get(name);
+ if (aggregator != null) {
+ synchronized (aggregator) {
+ aggregator.aggregate(value);
+ }
+ } else {
+ throw new IllegalStateException("aggregate: Tried to aggregate value " +
+ "to unregistered aggregator " + name);
+ }
+ }
+
+ @Override
+ public <A extends Writable> A getAggregatedValue(String name) {
+ return (A) previousAggregatedValueMap.get(name);
+ }
+
+ /**
+ * Prepare aggregators for current superstep
+ *
+ * @param requestProcessor Request processor for aggregators
+ */
+ public void prepareSuperstep(
+ WorkerAggregatorRequestProcessor requestProcessor) {
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("prepareSuperstep: Start preparing aggregators");
+ }
+ AllAggregatorServerData allAggregatorData =
+ serviceWorker.getServerData().getAllAggregatorData();
+ // Wait for my aggregators
+ Iterable<byte[]> dataToDistribute =
+ allAggregatorData.getDataFromMasterWhenReady();
+ try {
+ // Distribute my aggregators
+ requestProcessor.distributeAggregators(dataToDistribute);
+ } catch (IOException e) {
+ throw new IllegalStateException("prepareSuperstep: " +
+ "IOException occurred while trying to distribute aggregators", e);
+ }
+ // Wait for all other aggregators and store them
+ allAggregatorData.fillNextSuperstepMapsWhenReady(
+ serviceWorker.getWorkerInfoList().size(), previousAggregatedValueMap,
+ currentAggregatorMap);
+ allAggregatorData.reset();
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("prepareSuperstep: Aggregators prepared");
+ }
+ }
+
+ /**
+ * Send aggregators to their owners and in the end to the master
+ *
+ * @param requestProcessor Request processor for aggregators
+ */
+ public void finishSuperstep(
+ WorkerAggregatorRequestProcessor requestProcessor) {
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("finishSuperstep: Start finishing aggregators");
+ }
+ OwnerAggregatorServerData ownerAggregatorData =
+ serviceWorker.getServerData().getOwnerAggregatorData();
+ // First send partial aggregated values to their owners and determine
+ // which aggregators belong to this worker
+ for (Map.Entry<String, Aggregator<Writable>> entry :
+ currentAggregatorMap.entrySet()) {
+ try {
+ boolean sent = requestProcessor.sendAggregatedValue(entry.getKey(),
+ entry.getValue().getAggregatedValue());
+ if (!sent) {
+ // If it's my aggregator, add it directly
+ ownerAggregatorData.aggregate(entry.getKey(),
+ entry.getValue().getAggregatedValue());
+ }
+ } catch (IOException e) {
+ throw new IllegalStateException("finishSuperstep: " +
+ "IOException occurred while sending aggregator " +
+ entry.getKey() + " to its owner", e);
+ }
+ progressable.progress();
+ }
+ try {
+ // Flush
+ requestProcessor.flush();
+ } catch (IOException e) {
+ throw new IllegalStateException("finishSuperstep: " +
+ "IOException occurred while sending aggregators to owners", e);
+ }
+
+ // Wait to receive partial aggregated values from all other workers
+ Iterable<Map.Entry<String, Writable>> myAggregators =
+ ownerAggregatorData.getMyAggregatorValuesWhenReady(
+ serviceWorker.getWorkerInfoList().size());
+
+ // Send final aggregated values to master
+ AggregatedValueOutputStream aggregatorOutput =
+ new AggregatedValueOutputStream();
+ for (Map.Entry<String, Writable> entry : myAggregators) {
+ try {
+ int currentSize = aggregatorOutput.addAggregator(entry.getKey(),
+ entry.getValue());
+ if (currentSize > maxBytesPerAggregatorRequest) {
+ requestProcessor.sendAggregatedValuesToMaster(
+ aggregatorOutput.flush());
+ }
+ progressable.progress();
+ } catch (IOException e) {
+ throw new IllegalStateException("finishSuperstep: " +
+ "IOException occurred while writing aggregator " +
+ entry.getKey(), e);
+ }
+ }
+ try {
+ requestProcessor.sendAggregatedValuesToMaster(aggregatorOutput.flush());
+ } catch (IOException e) {
+ throw new IllegalStateException("finishSuperstep: " +
+ "IOException occured while sending aggregators to master", e);
+ }
+ // Wait for master to receive aggregated values before proceeding
+ serviceWorker.getWorkerClient().waitAllRequests();
+
+ ownerAggregatorData.reset();
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("finishSuperstep: Aggregators finished");
+ }
+ }
+
+ /**
+ * Create new aggregator usage which will be used by one of the compute
+ * threads.
+ *
+ * @return New aggregator usage
+ */
+ public WorkerThreadAggregatorUsage newThreadAggregatorUsage() {
+ if (AggregatorUtils.useThreadLocalAggregators(conf)) {
+ return new ThreadLocalWorkerAggregatorUsage();
+ } else {
+ return this;
+ }
+ }
+
+ @Override
+ public void finishThreadComputation() {
+ // If we don't use thread-local aggregators, all the aggregated values
+ // are already in this object
+ }
+
+ /**
+ * Not thread-safe implementation of {@link WorkerThreadAggregatorUsage}.
+ * We can use one instance of this object per thread to prevent
+ * synchronizing on each aggregate() call. In the end of superstep,
+ * values from each of these will be aggregated back to {@link
+ * WorkerAggregatorHandler}
+ */
+ public class ThreadLocalWorkerAggregatorUsage
+ implements WorkerThreadAggregatorUsage {
+ /** Thread-local aggregator map */
+ private final Map<String, Aggregator<Writable>> threadAggregatorMap;
+
+ /**
+ * Constructor
+ *
+ * Creates new instances of all aggregators from
+ * {@link WorkerAggregatorHandler}
+ */
+ public ThreadLocalWorkerAggregatorUsage() {
+ threadAggregatorMap = Maps.newHashMapWithExpectedSize(
+ WorkerAggregatorHandler.this.currentAggregatorMap.size());
+ for (Map.Entry<String, Aggregator<Writable>> entry :
+ WorkerAggregatorHandler.this.currentAggregatorMap.entrySet()) {
+ threadAggregatorMap.put(entry.getKey(),
+ AggregatorUtils.newAggregatorInstance(
+ (Class<Aggregator<Writable>>) entry.getValue().getClass()));
+ }
+ }
+
+ @Override
+ public <A extends Writable> void aggregate(String name, A value) {
+ Aggregator<Writable> aggregator = threadAggregatorMap.get(name);
+ if (aggregator != null) {
+ aggregator.aggregate(value);
+ } else {
+ throw new IllegalStateException("aggregate: " +
+ "Tried to aggregate value to unregistered aggregator " + name);
+ }
+ }
+
+ @Override
+ public <A extends Writable> A getAggregatedValue(String name) {
+ return WorkerAggregatorHandler.this.<A>getAggregatedValue(name);
+ }
+
+ @Override
+ public void finishThreadComputation() {
+ // Aggregate the values this thread's vertices provided back to
+ // WorkerAggregatorHandler
+ for (Map.Entry<String, Aggregator<Writable>> entry :
+ threadAggregatorMap.entrySet()) {
+ WorkerAggregatorHandler.this.aggregate(entry.getKey(),
+ entry.getValue().getAggregatedValue());
+ }
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/giraph/blob/1684891e/giraph-core/src/main/java/org/apache/giraph/worker/WorkerAggregatorUsage.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/worker/WorkerAggregatorUsage.java b/giraph-core/src/main/java/org/apache/giraph/worker/WorkerAggregatorUsage.java
new file mode 100644
index 0000000..75cc6dc
--- /dev/null
+++ b/giraph-core/src/main/java/org/apache/giraph/worker/WorkerAggregatorUsage.java
@@ -0,0 +1,44 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.giraph.worker;
+
+import org.apache.hadoop.io.Writable;
+
+/**
+ * Vertex classes can access and change aggregators through this interface
+ */
+public interface WorkerAggregatorUsage {
+ /**
+ * Add a new value
+ *
+ * @param name Name of aggregator
+ * @param value Value to add
+ * @param <A> Aggregated value
+ */
+ <A extends Writable> void aggregate(String name, A value);
+
+ /**
+ * Get value of an aggregator.
+ *
+ * @param name Name of aggregator
+ * @param <A> Aggregated value
+ * @return Value of the aggregator
+ */
+ <A extends Writable> A getAggregatedValue(String name);
+}
http://git-wip-us.apache.org/repos/asf/giraph/blob/1684891e/giraph-core/src/main/java/org/apache/giraph/worker/WorkerContext.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/worker/WorkerContext.java b/giraph-core/src/main/java/org/apache/giraph/worker/WorkerContext.java
new file mode 100644
index 0000000..d3ffaea
--- /dev/null
+++ b/giraph-core/src/main/java/org/apache/giraph/worker/WorkerContext.java
@@ -0,0 +1,122 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.giraph.worker;
+
+import org.apache.giraph.graph.GraphState;
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.mapreduce.Mapper;
+
+/**
+ * WorkerContext allows for the execution of user code
+ * on a per-worker basis. There's one WorkerContext per worker.
+ */
+@SuppressWarnings("rawtypes")
+public abstract class WorkerContext implements WorkerAggregatorUsage {
+ /** Global graph state */
+ private GraphState graphState;
+
+ /**
+ * Set the graph state.
+ *
+ * @param graphState Used to set the graph state.
+ */
+ public void setGraphState(GraphState graphState) {
+ this.graphState = graphState;
+ }
+
+ /**
+ * Initialize the WorkerContext.
+ * This method is executed once on each Worker before the first
+ * superstep starts.
+ *
+ * @throws IllegalAccessException Thrown for getting the class
+ * @throws InstantiationException Expected instantiation in this method.
+ */
+ public abstract void preApplication() throws InstantiationException,
+ IllegalAccessException;
+
+ /**
+ * Finalize the WorkerContext.
+ * This method is executed once on each Worker after the last
+ * superstep ends.
+ */
+ public abstract void postApplication();
+
+ /**
+ * Execute user code.
+ * This method is executed once on each Worker before each
+ * superstep starts.
+ */
+ public abstract void preSuperstep();
+
+ /**
+ * Execute user code.
+ * This method is executed once on each Worker after each
+ * superstep ends.
+ */
+ public abstract void postSuperstep();
+
+ /**
+ * Retrieves the current superstep.
+ *
+ * @return Current superstep
+ */
+ public long getSuperstep() {
+ return graphState.getSuperstep();
+ }
+
+ /**
+ * Get the total (all workers) number of vertices that
+ * existed in the previous superstep.
+ *
+ * @return Total number of vertices (-1 if first superstep)
+ */
+ public long getTotalNumVertices() {
+ return graphState.getTotalNumVertices();
+ }
+
+ /**
+ * Get the total (all workers) number of edges that
+ * existed in the previous superstep.
+ *
+ * @return Total number of edges (-1 if first superstep)
+ */
+ public long getTotalNumEdges() {
+ return graphState.getTotalNumEdges();
+ }
+
+ /**
+ * Get the mapper context
+ *
+ * @return Mapper context
+ */
+ public Mapper.Context getContext() {
+ return graphState.getContext();
+ }
+
+ @Override
+ public <A extends Writable> void aggregate(String name, A value) {
+ graphState.getWorkerAggregatorUsage().aggregate(name, value);
+ }
+
+ @Override
+ public <A extends Writable> A getAggregatedValue(String name) {
+ return graphState.getWorkerAggregatorUsage().<A>getAggregatedValue(name);
+ }
+}
http://git-wip-us.apache.org/repos/asf/giraph/blob/1684891e/giraph-core/src/main/java/org/apache/giraph/worker/WorkerInfo.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/worker/WorkerInfo.java b/giraph-core/src/main/java/org/apache/giraph/worker/WorkerInfo.java
new file mode 100644
index 0000000..265d676
--- /dev/null
+++ b/giraph-core/src/main/java/org/apache/giraph/worker/WorkerInfo.java
@@ -0,0 +1,37 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.giraph.worker;
+
+import org.apache.giraph.graph.TaskInfo;
+
+/**
+ * Information about a worker that is sent to the master and other workers.
+ */
+public class WorkerInfo extends TaskInfo {
+ /**
+ * Constructor for reflection
+ */
+ public WorkerInfo() {
+ }
+
+ @Override
+ public String toString() {
+ return "Worker(" + super.toString() + ")";
+ }
+}
http://git-wip-us.apache.org/repos/asf/giraph/blob/1684891e/giraph-core/src/main/java/org/apache/giraph/worker/WorkerThreadAggregatorUsage.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/worker/WorkerThreadAggregatorUsage.java b/giraph-core/src/main/java/org/apache/giraph/worker/WorkerThreadAggregatorUsage.java
new file mode 100644
index 0000000..194127e
--- /dev/null
+++ b/giraph-core/src/main/java/org/apache/giraph/worker/WorkerThreadAggregatorUsage.java
@@ -0,0 +1,31 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.giraph.worker;
+
+/**
+ * {@link WorkerAggregatorUsage} which can be used in each of the
+ * computation threads.
+ */
+public interface WorkerThreadAggregatorUsage extends WorkerAggregatorUsage {
+ /**
+ * Call this after thread's computation is finished,
+ * i.e. when all vertices have provided their values to aggregators
+ */
+ void finishThreadComputation();
+}
http://git-wip-us.apache.org/repos/asf/giraph/blob/1684891e/giraph-core/src/main/java/org/apache/giraph/worker/package-info.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/worker/package-info.java b/giraph-core/src/main/java/org/apache/giraph/worker/package-info.java
new file mode 100644
index 0000000..9c37caa
--- /dev/null
+++ b/giraph-core/src/main/java/org/apache/giraph/worker/package-info.java
@@ -0,0 +1,21 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+/**
+ * Package of all the worker related things
+ */
+package org.apache.giraph.worker;
http://git-wip-us.apache.org/repos/asf/giraph/blob/1684891e/giraph-core/src/main/java/org/apache/giraph/zk/PredicateLock.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/zk/PredicateLock.java b/giraph-core/src/main/java/org/apache/giraph/zk/PredicateLock.java
index eb61ea4..4937f82 100644
--- a/giraph-core/src/main/java/org/apache/giraph/zk/PredicateLock.java
+++ b/giraph-core/src/main/java/org/apache/giraph/zk/PredicateLock.java
@@ -23,8 +23,8 @@ import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
-import org.apache.giraph.utils.SystemTime;
-import org.apache.giraph.utils.Time;
+import org.apache.giraph.time.SystemTime;
+import org.apache.giraph.time.Time;
import org.apache.hadoop.util.Progressable;
import org.apache.log4j.Logger;
http://git-wip-us.apache.org/repos/asf/giraph/blob/1684891e/giraph-core/src/test/java/org/apache/giraph/BspCase.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/test/java/org/apache/giraph/BspCase.java b/giraph-core/src/test/java/org/apache/giraph/BspCase.java
index e0c3af4..f43efe9 100644
--- a/giraph-core/src/test/java/org/apache/giraph/BspCase.java
+++ b/giraph-core/src/test/java/org/apache/giraph/BspCase.java
@@ -234,7 +234,7 @@ public class BspCase implements Watcher {
*
* @return whether we use a real hadoop instance or not
*/
- boolean runningInDistributedMode() {
+ public boolean runningInDistributedMode() {
return jobTracker != null;
}
http://git-wip-us.apache.org/repos/asf/giraph/blob/1684891e/giraph-core/src/test/java/org/apache/giraph/TestBspBasic.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/test/java/org/apache/giraph/TestBspBasic.java b/giraph-core/src/test/java/org/apache/giraph/TestBspBasic.java
index 55d45d3..91d536e 100644
--- a/giraph-core/src/test/java/org/apache/giraph/TestBspBasic.java
+++ b/giraph-core/src/test/java/org/apache/giraph/TestBspBasic.java
@@ -30,17 +30,17 @@ import org.apache.giraph.examples.SimpleMsgVertex;
import org.apache.giraph.examples.SimplePageRankVertex;
import org.apache.giraph.examples.SimplePageRankVertex.SimplePageRankVertexInputFormat;
import org.apache.giraph.examples.SimpleShortestPathsVertex;
-import org.apache.giraph.examples.SimpleSumCombiner;
+import org.apache.giraph.combiner.SimpleSumCombiner;
import org.apache.giraph.examples.SimpleSuperstepVertex;
import org.apache.giraph.examples.SimpleSuperstepVertex.SimpleSuperstepVertexInputFormat;
import org.apache.giraph.examples.SimpleSuperstepVertex.SimpleSuperstepVertexOutputFormat;
-import org.apache.giraph.graph.EdgeListVertex;
+import org.apache.giraph.vertex.EdgeListVertex;
import org.apache.giraph.graph.GiraphJob;
-import org.apache.giraph.graph.InputSplitPathOrganizer;
-import org.apache.giraph.graph.TextAggregatorWriter;
-import org.apache.giraph.graph.Vertex;
-import org.apache.giraph.graph.VertexInputFormat;
-import org.apache.giraph.io.JsonLongDoubleFloatDoubleVertexOutputFormat;
+import org.apache.giraph.worker.InputSplitPathOrganizer;
+import org.apache.giraph.aggregators.TextAggregatorWriter;
+import org.apache.giraph.vertex.Vertex;
+import org.apache.giraph.io.VertexInputFormat;
+import org.apache.giraph.io.formats.JsonLongDoubleFloatDoubleVertexOutputFormat;
import org.apache.giraph.zk.ZooKeeperExt;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataInputStream;
http://git-wip-us.apache.org/repos/asf/giraph/blob/1684891e/giraph-core/src/test/java/org/apache/giraph/TestEdgeInput.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/test/java/org/apache/giraph/TestEdgeInput.java b/giraph-core/src/test/java/org/apache/giraph/TestEdgeInput.java
deleted file mode 100644
index 797a410..0000000
--- a/giraph-core/src/test/java/org/apache/giraph/TestEdgeInput.java
+++ /dev/null
@@ -1,165 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.giraph;
-
-import org.apache.giraph.conf.GiraphClasses;
-import org.apache.giraph.graph.EdgeListVertex;
-import org.apache.giraph.io.IdWithValueTextOutputFormat;
-import org.apache.giraph.io.IntIntTextVertexValueInputFormat;
-import org.apache.giraph.io.IntNullTextEdgeInputFormat;
-import org.apache.giraph.utils.InternalVertexRunner;
-import org.apache.hadoop.io.IntWritable;
-import org.apache.hadoop.io.NullWritable;
-import org.junit.Test;
-
-import com.google.common.collect.ImmutableMap;
-import com.google.common.collect.Maps;
-
-import java.io.IOException;
-import java.util.Map;
-
-import static org.junit.Assert.assertEquals;
-
-/**
- * A test case to ensure that loading a graph from a list of edges works as
- * expected.
- */
-public class TestEdgeInput extends BspCase {
- public TestEdgeInput() {
- super(TestEdgeInput.class.getName());
- }
-
- // It should be able to build a graph starting from the edges only.
- // Vertices should be implicitly created with default values.
- @Test
- public void testEdgesOnly() throws Exception {
- String[] edges = new String[] {
- "1 2",
- "2 3",
- "2 4",
- "4 1"
- };
-
- GiraphClasses classes = new GiraphClasses();
- classes.setVertexClass(TestVertexWithNumEdges.class);
- classes.setEdgeInputFormatClass(IntNullTextEdgeInputFormat.class);
- classes.setVertexOutputFormatClass(IdWithValueTextOutputFormat.class);
- Map<String, String> params = ImmutableMap.of();
- Iterable<String> results = InternalVertexRunner.run(classes, params,
- null, edges);
-
- Map<Integer, Integer> values = parseResults(results);
-
- // Check that all vertices with outgoing edges have been created
- assertEquals(3, values.size());
- // Check the number of edges for each vertex
- assertEquals(1, (int) values.get(1));
- assertEquals(2, (int) values.get(2));
- assertEquals(1, (int) values.get(4));
- }
-
- // It should be able to build a graph by specifying vertex data and edges
- // as separate input formats.
- @Test
- public void testMixedFormat() throws Exception {
- String[] vertices = new String[] {
- "1 75",
- "2 34",
- "3 13",
- "4 32"
- };
- String[] edges = new String[] {
- "1 2",
- "2 3",
- "2 4",
- "4 1",
- "5 3"
- };
-
- GiraphClasses classes = new GiraphClasses();
- classes.setVertexClass(TestVertexDoNothing.class);
- classes.setVertexInputFormatClass(IntIntTextVertexValueInputFormat.class);
- classes.setEdgeInputFormatClass(IntNullTextEdgeInputFormat.class);
- classes.setVertexOutputFormatClass(IdWithValueTextOutputFormat.class);
- Map<String, String> emptyParams = ImmutableMap.of();
-
- // Run a job with a vertex that does nothing
- Iterable<String> results = InternalVertexRunner.run(classes, emptyParams,
- vertices, edges);
-
- Map<Integer, Integer> values = parseResults(results);
-
- // Check that all vertices with either initial values or outgoing edges
- // have been created
- assertEquals(5, values.size());
- // Check that the vertices have been created with correct values
- assertEquals(75, (int) values.get(1));
- assertEquals(34, (int) values.get(2));
- assertEquals(13, (int) values.get(3));
- assertEquals(32, (int) values.get(4));
- // A vertex with edges but no initial value should have the default value
- assertEquals(0, (int) values.get(5));
-
- classes = new GiraphClasses();
- classes.setVertexClass(TestVertexWithNumEdges.class);
- classes.setVertexInputFormatClass(IntIntTextVertexValueInputFormat.class);
- classes.setEdgeInputFormatClass(IntNullTextEdgeInputFormat.class);
- classes.setVertexOutputFormatClass(IdWithValueTextOutputFormat.class);
-
- // Run a job with a vertex that counts outgoing edges
- results = InternalVertexRunner.run(classes, emptyParams, vertices, edges);
-
- values = parseResults(results);
-
- // Check the number of edges for each vertex
- assertEquals(1, (int) values.get(1));
- assertEquals(2, (int) values.get(2));
- assertEquals(0, (int) values.get(3));
- assertEquals(1, (int) values.get(4));
- assertEquals(1, (int) values.get(5));
- }
-
- public static class TestVertexWithNumEdges extends EdgeListVertex<IntWritable,
- IntWritable, NullWritable, NullWritable> {
- @Override
- public void compute(Iterable<NullWritable> messages) throws IOException {
- setValue(new IntWritable(getNumEdges()));
- voteToHalt();
- }
- }
-
- public static class TestVertexDoNothing extends EdgeListVertex<IntWritable,
- IntWritable, NullWritable, NullWritable> {
- @Override
- public void compute(Iterable<NullWritable> messages) throws IOException {
- voteToHalt();
- }
- }
-
- private static Map<Integer, Integer> parseResults(Iterable<String> results) {
- Map<Integer, Integer> values = Maps.newHashMap();
- for (String line : results) {
- String[] tokens = line.split("\\s+");
- int id = Integer.valueOf(tokens[0]);
- int value = Integer.valueOf(tokens[1]);
- values.put(id, value);
- }
- return values;
- }
-}
http://git-wip-us.apache.org/repos/asf/giraph/blob/1684891e/giraph-core/src/test/java/org/apache/giraph/TestGiraphConfiguration.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/test/java/org/apache/giraph/TestGiraphConfiguration.java b/giraph-core/src/test/java/org/apache/giraph/TestGiraphConfiguration.java
deleted file mode 100644
index 0f8578a..0000000
--- a/giraph-core/src/test/java/org/apache/giraph/TestGiraphConfiguration.java
+++ /dev/null
@@ -1,72 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.giraph;
-
-import org.apache.giraph.conf.GiraphConfiguration;
-import org.junit.Test;
-
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.fail;
-
-public class TestGiraphConfiguration {
- public interface If { }
- public class A implements If { }
- public class B implements If { }
- public class C implements If { }
-
- @Test
- public void testSetClasses() {
- GiraphConfiguration conf = new GiraphConfiguration();
- conf.setClasses("foo", If.class, A.class, B.class);
- Class<?>[] klasses = conf.getClasses("foo");
- assertEquals(2, klasses.length);
- assertEquals(A.class, klasses[0]);
- assertEquals(B.class, klasses[1]);
-
- try {
- conf.setClasses("foo", A.class, B.class);
- fail();
- } catch (RuntimeException e) {
- assertEquals(2, conf.getClasses("foo").length);
- }
-
- Class<? extends If>[] klasses2 = conf.getClassesOfType("foo", If.class);
- assertEquals(2, klasses2.length);
- assertEquals(A.class, klasses2[0]);
- assertEquals(B.class, klasses2[1]);
- }
-
- @Test
- public void testAddToClasses() {
- GiraphConfiguration conf = new GiraphConfiguration();
-
- conf.setClasses("foo", If.class, A.class, B.class);
- conf.addToClasses("foo", C.class, If.class);
- Class<?>[] klasses = conf.getClasses("foo");
- assertEquals(3, klasses.length);
- assertEquals(A.class, klasses[0]);
- assertEquals(B.class, klasses[1]);
- assertEquals(C.class, klasses[2]);
-
- conf.addToClasses("bar", B.class, If.class);
- klasses = conf.getClasses("bar");
- assertEquals(1, klasses.length);
- assertEquals(B.class, klasses[0]);
- }
-}
http://git-wip-us.apache.org/repos/asf/giraph/blob/1684891e/giraph-core/src/test/java/org/apache/giraph/TestGraphPartitioner.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/test/java/org/apache/giraph/TestGraphPartitioner.java b/giraph-core/src/test/java/org/apache/giraph/TestGraphPartitioner.java
index 6fda813..ff71b86 100644
--- a/giraph-core/src/test/java/org/apache/giraph/TestGraphPartitioner.java
+++ b/giraph-core/src/test/java/org/apache/giraph/TestGraphPartitioner.java
@@ -24,8 +24,8 @@ import org.apache.giraph.examples.SimpleCheckpointVertex;
import org.apache.giraph.examples.SimpleSuperstepVertex.SimpleSuperstepVertexInputFormat;
import org.apache.giraph.examples.SimpleSuperstepVertex.SimpleSuperstepVertexOutputFormat;
import org.apache.giraph.graph.GiraphJob;
-import org.apache.giraph.graph.partition.HashRangePartitionerFactory;
-import org.apache.giraph.graph.partition.PartitionBalancer;
+import org.apache.giraph.partition.HashRangePartitionerFactory;
+import org.apache.giraph.partition.PartitionBalancer;
import org.apache.giraph.integration.SuperstepHashPartitionerFactory;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
http://git-wip-us.apache.org/repos/asf/giraph/blob/1684891e/giraph-core/src/test/java/org/apache/giraph/TestJsonBase64Format.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/test/java/org/apache/giraph/TestJsonBase64Format.java b/giraph-core/src/test/java/org/apache/giraph/TestJsonBase64Format.java
deleted file mode 100644
index 1e2b0e0..0000000
--- a/giraph-core/src/test/java/org/apache/giraph/TestJsonBase64Format.java
+++ /dev/null
@@ -1,104 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.giraph;
-import org.apache.giraph.benchmark.EdgeListVertexPageRankBenchmark;
-import org.apache.giraph.benchmark.PageRankComputation;
-import org.apache.giraph.conf.GiraphClasses;
-import org.apache.giraph.graph.GiraphJob;
-import org.apache.giraph.io.GiraphFileInputFormat;
-import org.apache.giraph.io.JsonBase64VertexInputFormat;
-import org.apache.giraph.io.JsonBase64VertexOutputFormat;
-import org.apache.giraph.io.PseudoRandomVertexInputFormat;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.Path;
-import org.junit.Test;
-
-import java.io.IOException;
-
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertTrue;
-
-/**
- * Test out the JsonBase64 format.
- */
-public class TestJsonBase64Format extends BspCase {
- /**
- * Constructor.
- */
- public TestJsonBase64Format() {
- super(TestJsonBase64Format.class.getName());
- }
-
- /**
- * Start a job and finish after i supersteps, then begin a new job and
- * continue on more j supersteps. Check the results against a single job
- * with i + j supersteps.
- *
- * @throws IOException
- * @throws ClassNotFoundException
- * @throws InterruptedException
- */
- @Test
- public void testContinue()
- throws IOException, InterruptedException, ClassNotFoundException {
-
- Path outputPath = getTempPath(getCallingMethodName());
- GiraphClasses classes = new GiraphClasses();
- classes.setVertexClass(EdgeListVertexPageRankBenchmark.class);
- classes.setVertexInputFormatClass(PseudoRandomVertexInputFormat.class);
- classes.setVertexOutputFormatClass(JsonBase64VertexOutputFormat.class);
- GiraphJob job = prepareJob(getCallingMethodName(), classes, outputPath);
- job.getConfiguration().setLong(
- PseudoRandomVertexInputFormat.AGGREGATE_VERTICES, 101);
- job.getConfiguration().setLong(
- PseudoRandomVertexInputFormat.EDGES_PER_VERTEX, 2);
- job.getConfiguration().setInt(PageRankComputation.SUPERSTEP_COUNT, 2);
-
- assertTrue(job.run(true));
-
- Path outputPath2 = getTempPath(getCallingMethodName() + "2");
- classes = new GiraphClasses();
- classes.setVertexClass(EdgeListVertexPageRankBenchmark.class);
- classes.setVertexInputFormatClass(JsonBase64VertexInputFormat.class);
- classes.setVertexOutputFormatClass(JsonBase64VertexOutputFormat.class);
- job = prepareJob(getCallingMethodName(), classes, outputPath2);
- job.getConfiguration().setInt(PageRankComputation.SUPERSTEP_COUNT, 3);
- GiraphFileInputFormat.addVertexInputPath(job.getInternalJob(), outputPath);
- assertTrue(job.run(true));
-
- Path outputPath3 = getTempPath(getCallingMethodName() + "3");
- classes = new GiraphClasses();
- classes.setVertexClass(EdgeListVertexPageRankBenchmark.class);
- classes.setVertexInputFormatClass(PseudoRandomVertexInputFormat.class);
- classes.setVertexOutputFormatClass(JsonBase64VertexOutputFormat.class);
- job = prepareJob(getCallingMethodName(), classes, outputPath3);
- job.getConfiguration().setLong(
- PseudoRandomVertexInputFormat.AGGREGATE_VERTICES, 101);
- job.getConfiguration().setLong(
- PseudoRandomVertexInputFormat.EDGES_PER_VERTEX, 2);
- job.getConfiguration().setInt(PageRankComputation.SUPERSTEP_COUNT, 5);
- assertTrue(job.run(true));
-
- Configuration conf = job.getConfiguration();
-
- assertEquals(101, getNumResults(conf, outputPath));
- assertEquals(101, getNumResults(conf, outputPath2));
- assertEquals(101, getNumResults(conf, outputPath3));
- }
-}
http://git-wip-us.apache.org/repos/asf/giraph/blob/1684891e/giraph-core/src/test/java/org/apache/giraph/TestMasterObserver.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/test/java/org/apache/giraph/TestMasterObserver.java b/giraph-core/src/test/java/org/apache/giraph/TestMasterObserver.java
deleted file mode 100644
index 8609bc3..0000000
--- a/giraph-core/src/test/java/org/apache/giraph/TestMasterObserver.java
+++ /dev/null
@@ -1,103 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.giraph;
-
-import org.apache.giraph.conf.GiraphClasses;
-import org.apache.giraph.conf.GiraphConstants;
-import org.apache.giraph.graph.IntNullNullNullVertex;
-import org.apache.giraph.io.IntNullNullNullTextInputFormat;
-import org.apache.giraph.master.DefaultMasterObserver;
-import org.apache.giraph.utils.InternalVertexRunner;
-import org.apache.hadoop.io.NullWritable;
-import org.apache.hadoop.util.StringUtils;
-import org.junit.Test;
-
-import com.google.common.collect.Maps;
-
-import java.io.IOException;
-import java.util.Map;
-
-import static org.junit.Assert.assertEquals;
-
-public class TestMasterObserver {
- public static class NoOpVertex extends IntNullNullNullVertex {
- private int count = 0;
-
- @Override
- public void compute(Iterable<NullWritable> messages) throws IOException {
- if (count == 2) {
- voteToHalt();
- }
- ++count;
- }
- }
-
- public static class Obs extends DefaultMasterObserver {
- public static int preApp = 0;
- public static int preSuperstep = 0;
- public static int postSuperstep = 0;
- public static int postApp = 0;
-
- @Override
- public void preApplication() {
- ++preApp;
- }
-
- @Override
- public void postApplication() {
- ++postApp;
- }
-
- @Override
- public void preSuperstep() {
- ++preSuperstep;
- }
-
- @Override
- public void postSuperstep() {
- ++postSuperstep;
- }
- }
-
- @Test
- public void testGetsCalled() throws Exception {
- assertEquals(0, Obs.postApp);
-
- String[] graph = new String[] { "1", "2", "3" };
-
- Map<String, String> params = Maps.newHashMap();
- String klasses[] = new String[] {
- Obs.class.getName(),
- Obs.class.getName()
- };
- params.put(GiraphConstants.MASTER_OBSERVER_CLASSES,
- StringUtils.arrayToString(klasses));
-
- GiraphClasses classes = new GiraphClasses();
- classes.setVertexClass(NoOpVertex.class);
- classes.setVertexInputFormatClass(IntNullNullNullTextInputFormat.class);
- InternalVertexRunner.run(classes, params, graph);
-
- assertEquals(2, Obs.preApp);
- // 3 supersteps + 1 input superstep * 2 observers = 8 callbacks
- assertEquals(8, Obs.preSuperstep);
- assertEquals(8, Obs.postSuperstep);
- assertEquals(2, Obs.postApp);
- }
-}
http://git-wip-us.apache.org/repos/asf/giraph/blob/1684891e/giraph-core/src/test/java/org/apache/giraph/TestPageRank.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/test/java/org/apache/giraph/TestPageRank.java b/giraph-core/src/test/java/org/apache/giraph/TestPageRank.java
deleted file mode 100644
index e71e11f..0000000
--- a/giraph-core/src/test/java/org/apache/giraph/TestPageRank.java
+++ /dev/null
@@ -1,104 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.giraph;
-
-import org.apache.giraph.conf.GiraphClasses;
-import org.apache.giraph.conf.GiraphConfiguration;
-import org.apache.giraph.examples.SimplePageRankVertex;
-import org.apache.giraph.graph.GiraphJob;
-import org.apache.giraph.graph.partition.HashMasterPartitioner;
-import org.apache.hadoop.io.DoubleWritable;
-import org.apache.hadoop.io.FloatWritable;
-import org.apache.hadoop.io.LongWritable;
-import org.junit.Test;
-
-import java.io.IOException;
-
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertTrue;
-
-/**
- * Test page rank (with and without multithreading)
- */
-public class TestPageRank extends BspCase {
-
- /**
- * Constructor
- */
- public TestPageRank() {
- super(TestPageRank.class.getName());
- }
-
- @Test
- public void testBspPageRankSingleCompute()
- throws ClassNotFoundException, IOException, InterruptedException {
- testPageRank(1);
- }
-
-
- @Test
- public void testPageRankTenThreadsCompute()
- throws ClassNotFoundException, IOException, InterruptedException {
- testPageRank(10);
- }
-
- /**
- * Generic page rank test
- *
- * @param numComputeThreads Number of compute threads to use
- * @throws java.io.IOException
- * @throws ClassNotFoundException
- * @throws InterruptedException
- */
- private void testPageRank(int numComputeThreads)
- throws IOException, InterruptedException, ClassNotFoundException {
- GiraphClasses<LongWritable, DoubleWritable, FloatWritable, DoubleWritable>
- classes = new GiraphClasses();
- classes.setVertexClass(SimplePageRankVertex.class);
- classes.setVertexInputFormatClass(
- SimplePageRankVertex.SimplePageRankVertexInputFormat.class);
- classes.setWorkerContextClass(
- SimplePageRankVertex.SimplePageRankVertexWorkerContext.class);
- classes.setMasterComputeClass(
- SimplePageRankVertex.SimplePageRankVertexMasterCompute.class);
- GiraphJob job = prepareJob(getCallingMethodName(), classes);
- GiraphConfiguration conf = job.getConfiguration();
- conf.setNumComputeThreads(numComputeThreads);
- // Set enough partitions to generate randomness on the compute side
- if (numComputeThreads != 1) {
- conf.setInt(HashMasterPartitioner.USER_PARTITION_COUNT,
- numComputeThreads * 5);
- }
- assertTrue(job.run(true));
- if (!runningInDistributedMode()) {
- double maxPageRank =
- SimplePageRankVertex.SimplePageRankVertexWorkerContext.getFinalMax();
- double minPageRank =
- SimplePageRankVertex.SimplePageRankVertexWorkerContext.getFinalMin();
- long numVertices =
- SimplePageRankVertex.SimplePageRankVertexWorkerContext.getFinalSum();
- System.out.println(getCallingMethodName() + ": maxPageRank=" +
- maxPageRank + " minPageRank=" +
- minPageRank + " numVertices=" + numVertices + ", " +
- " numComputeThreads=" + numComputeThreads);
- assertEquals(34.03, maxPageRank, 0.001);
- assertEquals(0.03, minPageRank, 0.00001);
- assertEquals(5l, numVertices);
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/giraph/blob/1684891e/giraph-core/src/test/java/org/apache/giraph/TestPredicateLock.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/test/java/org/apache/giraph/TestPredicateLock.java b/giraph-core/src/test/java/org/apache/giraph/TestPredicateLock.java
deleted file mode 100644
index c6043bc..0000000
--- a/giraph-core/src/test/java/org/apache/giraph/TestPredicateLock.java
+++ /dev/null
@@ -1,148 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.giraph;
-
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertFalse;
-import static org.junit.Assert.assertTrue;
-import static org.mockito.Mockito.mock;
-import static org.mockito.Mockito.when;
-
-import java.util.concurrent.atomic.AtomicInteger;
-
-import org.apache.giraph.utils.Time;
-import org.apache.giraph.zk.BspEvent;
-import org.apache.giraph.zk.PredicateLock;
-import org.apache.hadoop.util.Progressable;
-import org.junit.Before;
-import org.junit.Test;
-
-/**
- * Ensure that PredicateLock objects work correctly.
- */
-public class TestPredicateLock {
- /** How many times was progress called? */
- private AtomicInteger progressCalled = new AtomicInteger(0);
-
- private static class SignalThread extends Thread {
- private final BspEvent event;
- public SignalThread(BspEvent event) {
- this.event = event;
- }
- public void run() {
- try {
- Thread.sleep(500);
- } catch (InterruptedException e) {
- }
- event.signal();
- }
- }
-
- private Progressable stubContext;
-
- private Progressable getStubProgressable() {
- if (stubContext == null)
- stubContext = new Progressable() {
- @Override
- public void progress() {
- progressCalled.incrementAndGet();
- }
- };
- return stubContext;
- }
-
- @Before
- public void setUp() {
- progressCalled.set(0);
- }
-
- /**
- * SMake sure the the event is not signaled.
- */
- @Test
- public void testWaitMsecsNoEvent() {
- Time mockTime = mock(Time.class);
- when(mockTime.getMilliseconds()).
- thenReturn(0L).thenReturn(2L);
- BspEvent event = new PredicateLock(getStubProgressable(), 1, mockTime);
- boolean gotPredicate = event.waitMsecs(1);
- assertFalse(gotPredicate);
- assertEquals(0, progressCalled.get());
- when(mockTime.getMilliseconds()).
- thenReturn(0L).thenReturn(0L).thenReturn(2L);
- gotPredicate = event.waitMsecs(1);
- assertFalse(gotPredicate);
- assertEquals(1, progressCalled.get());
- }
-
- /**
- * Single threaded case where the event is signaled.
- */
- @Test
- public void testEvent() {
- Time mockTime = mock(Time.class);
- when(mockTime.getMilliseconds()).
- thenReturn(0L).thenReturn(2L);
- BspEvent event = new PredicateLock(getStubProgressable(), 1, mockTime);
- event.signal();
- boolean gotPredicate = event.waitMsecs(2);
- assertTrue(gotPredicate);
- event.reset();
- when(mockTime.getMilliseconds()).
- thenReturn(0L).thenReturn(2L);
- gotPredicate = event.waitMsecs(0);
- assertFalse(gotPredicate);
- }
-
- /**
- * Thread signaled test for {@link PredicateLock#waitForever()}
- */
- @Test
- public void testWaitForever() {
- BspEvent event = new PredicateLock(getStubProgressable());
- Thread signalThread = new SignalThread(event);
- signalThread.start();
- event.waitForever();
- try {
- signalThread.join();
- } catch (InterruptedException e) {
- }
- assertTrue(event.waitMsecs(0));
- }
-
- /**
- * Thread signaled test to make sure the the event is signaled correctly
- *
- * @throws InterruptedException
- */
- @Test
- public void testWaitMsecs() {
- BspEvent event = new PredicateLock(getStubProgressable());
- Thread signalThread = new SignalThread(event);
- signalThread.start();
- boolean gotPredicate = event.waitMsecs(2000);
- assertTrue(gotPredicate);
- try {
- signalThread.join();
- } catch (InterruptedException e) {
- }
- gotPredicate = event.waitMsecs(0);
- assertTrue(gotPredicate);
- }
-}