You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@giraph.apache.org by ni...@apache.org on 2013/01/04 21:52:39 UTC
[15/23] GIRAPH-409: Refactor / cleanups (nitay)
http://git-wip-us.apache.org/repos/asf/giraph/blob/1684891e/giraph-core/src/main/java/org/apache/giraph/graph/VertexInputSplitsCallable.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/graph/VertexInputSplitsCallable.java b/giraph-core/src/main/java/org/apache/giraph/graph/VertexInputSplitsCallable.java
deleted file mode 100644
index e1e6235..0000000
--- a/giraph-core/src/main/java/org/apache/giraph/graph/VertexInputSplitsCallable.java
+++ /dev/null
@@ -1,180 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.giraph.graph;
-
-import org.apache.giraph.conf.ImmutableClassesGiraphConfiguration;
-import org.apache.giraph.graph.partition.PartitionOwner;
-import org.apache.giraph.metrics.GiraphMetrics;
-import org.apache.giraph.metrics.GiraphMetricsRegistry;
-import org.apache.giraph.utils.LoggerUtils;
-import org.apache.giraph.utils.MemoryUtils;
-import org.apache.giraph.zk.ZooKeeperExt;
-import org.apache.hadoop.io.Writable;
-import org.apache.hadoop.io.WritableComparable;
-import org.apache.hadoop.mapreduce.InputSplit;
-import org.apache.hadoop.mapreduce.Mapper;
-import org.apache.log4j.Level;
-import org.apache.log4j.Logger;
-
-import com.yammer.metrics.core.Counter;
-
-import java.io.IOException;
-import java.util.List;
-
-/**
- * Load as many vertex input splits as possible.
- * Every thread will has its own instance of WorkerClientRequestProcessor
- * to send requests.
- *
- * @param <I> Vertex index value
- * @param <V> Vertex value
- * @param <E> Edge value
- * @param <M> Message data
- */
-public class VertexInputSplitsCallable<I extends WritableComparable,
- V extends Writable, E extends Writable, M extends Writable>
- extends InputSplitsCallable<I, V, E, M> {
- /** Class logger */
- private static final Logger LOG =
- Logger.getLogger(VertexInputSplitsCallable.class);
- /** Total vertices loaded */
- private long totalVerticesLoaded = 0;
- /** Total edges loaded */
- private long totalEdgesLoaded = 0;
- /** Input split max vertices (-1 denotes all) */
- private final long inputSplitMaxVertices;
- /** Bsp service worker (only use thread-safe methods) */
- private final BspServiceWorker<I, V, E, M> bspServiceWorker;
-
- // Metrics
- /** number of vertices loaded counter */
- private final Counter verticesLoadedCounter;
- /** number of edges loaded counter */
- private final Counter edgesLoadedCounter;
-
- /**
- * Constructor.
- *
- * @param context Context
- * @param graphState Graph state
- * @param configuration Configuration
- * @param bspServiceWorker service worker
- * @param inputSplitPathList List of the paths of the input splits
- * @param workerInfo This worker's info
- * @param zooKeeperExt Handle to ZooKeeperExt
- */
- public VertexInputSplitsCallable(
- Mapper<?, ?, ?, ?>.Context context,
- GraphState<I, V, E, M> graphState,
- ImmutableClassesGiraphConfiguration<I, V, E, M> configuration,
- BspServiceWorker<I, V, E, M> bspServiceWorker,
- List<String> inputSplitPathList,
- WorkerInfo workerInfo,
- ZooKeeperExt zooKeeperExt) {
- super(context, graphState, configuration, bspServiceWorker,
- inputSplitPathList, workerInfo, zooKeeperExt,
- BspServiceWorker.VERTEX_INPUT_SPLIT_RESERVED_NODE,
- BspServiceWorker.VERTEX_INPUT_SPLIT_FINISHED_NODE,
- bspServiceWorker.vertexInputSplitsEvents);
-
- inputSplitMaxVertices = configuration.getInputSplitMaxVertices();
- this.bspServiceWorker = bspServiceWorker;
-
- // Initialize Metrics
- GiraphMetricsRegistry jobMetrics = GiraphMetrics.get().perJob();
- verticesLoadedCounter = jobMetrics.getCounter(COUNTER_VERTICES_LOADED);
- edgesLoadedCounter = jobMetrics.getCounter(COUNTER_EDGES_LOADED);
- }
-
- /**
- * Read vertices from input split. If testing, the user may request a
- * maximum number of vertices to be read from an input split.
- *
- * @param inputSplit Input split to process with vertex reader
- * @param graphState Current graph state
- * @return Vertices and edges loaded from this input split
- * @throws IOException
- * @throws InterruptedException
- */
- @Override
- protected VertexEdgeCount readInputSplit(
- InputSplit inputSplit,
- GraphState<I, V, E, M> graphState)
- throws IOException, InterruptedException {
- VertexInputFormat<I, V, E, M> vertexInputFormat =
- configuration.createVertexInputFormat();
- VertexReader<I, V, E, M> vertexReader =
- vertexInputFormat.createVertexReader(inputSplit, context);
- vertexReader.initialize(inputSplit, context);
- long inputSplitVerticesLoaded = 0;
- long inputSplitEdgesLoaded = 0;
- while (vertexReader.nextVertex()) {
- Vertex<I, V, E, M> readerVertex =
- vertexReader.getCurrentVertex();
- if (readerVertex.getId() == null) {
- throw new IllegalArgumentException(
- "readInputSplit: Vertex reader returned a vertex " +
- "without an id! - " + readerVertex);
- }
- if (readerVertex.getValue() == null) {
- readerVertex.setValue(configuration.createVertexValue());
- }
- readerVertex.setConf(configuration);
- readerVertex.setGraphState(graphState);
-
- PartitionOwner partitionOwner =
- bspServiceWorker.getVertexPartitionOwner(readerVertex.getId());
- graphState.getWorkerClientRequestProcessor().sendVertexRequest(
- partitionOwner, readerVertex);
- context.progress(); // do this before potential data transfer
- ++inputSplitVerticesLoaded;
- inputSplitEdgesLoaded += readerVertex.getNumEdges();
-
- // Update status every 250k vertices
- if (((inputSplitVerticesLoaded + totalVerticesLoaded) % 250000) == 0) {
- LoggerUtils.setStatusAndLog(context, LOG, Level.INFO,
- "readInputSplit: Loaded " +
- (inputSplitVerticesLoaded + totalVerticesLoaded) +
- " vertices " +
- (inputSplitEdgesLoaded + totalEdgesLoaded) + " edges " +
- MemoryUtils.getRuntimeMemoryStats());
- }
-
- // For sampling, or to limit outlier input splits, the number of
- // records per input split can be limited
- if (inputSplitMaxVertices > 0 &&
- inputSplitVerticesLoaded >= inputSplitMaxVertices) {
- if (LOG.isInfoEnabled()) {
- LOG.info("readInputSplit: Leaving the input " +
- "split early, reached maximum vertices " +
- inputSplitVerticesLoaded);
- }
- break;
- }
- }
- vertexReader.close();
- totalVerticesLoaded += inputSplitVerticesLoaded;
- verticesLoadedCounter.inc(inputSplitVerticesLoaded);
- totalEdgesLoaded += inputSplitEdgesLoaded;
- edgesLoadedCounter.inc(inputSplitEdgesLoaded);
- return new VertexEdgeCount(
- inputSplitVerticesLoaded, inputSplitEdgesLoaded);
- }
-}
-
http://git-wip-us.apache.org/repos/asf/giraph/blob/1684891e/giraph-core/src/main/java/org/apache/giraph/graph/VertexInputSplitsCallableFactory.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/graph/VertexInputSplitsCallableFactory.java b/giraph-core/src/main/java/org/apache/giraph/graph/VertexInputSplitsCallableFactory.java
deleted file mode 100644
index a55c4b0..0000000
--- a/giraph-core/src/main/java/org/apache/giraph/graph/VertexInputSplitsCallableFactory.java
+++ /dev/null
@@ -1,94 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.giraph.graph;
-
-import org.apache.giraph.conf.ImmutableClassesGiraphConfiguration;
-import org.apache.giraph.zk.ZooKeeperExt;
-import org.apache.hadoop.io.Writable;
-import org.apache.hadoop.io.WritableComparable;
-import org.apache.hadoop.mapreduce.Mapper;
-
-import java.util.List;
-
-/**
- * Factory for {@link VertexInputSplitsCallable}s.
- *
- * @param <I> Vertex id
- * @param <V> Vertex value
- * @param <E> Edge value
- * @param <M> Message data
- */
-public class VertexInputSplitsCallableFactory<I extends WritableComparable,
- V extends Writable, E extends Writable, M extends Writable>
- implements InputSplitsCallableFactory<I, V, E, M> {
- /** Mapper context. */
- private final Mapper<?, ?, ?, ?>.Context context;
- /** Graph state. */
- private final GraphState<I, V, E, M> graphState;
- /** Configuration. */
- private final ImmutableClassesGiraphConfiguration<I, V, E, M> configuration;
- /** {@link BspServiceWorker} we're running on. */
- private final BspServiceWorker<I, V, E, M> bspServiceWorker;
- /** List of input split paths. */
- private final List<String> inputSplitPathList;
- /** Worker info. */
- private final WorkerInfo workerInfo;
- /** {@link ZooKeeperExt} for this worker. */
- private final ZooKeeperExt zooKeeperExt;
-
- /**
- * Constructor.
- *
- * @param context Mapper context
- * @param graphState Graph state
- * @param configuration Configuration
- * @param bspServiceWorker Calling {@link BspServiceWorker}
- * @param inputSplitPathList List of input split paths
- * @param workerInfo Worker info
- * @param zooKeeperExt {@link ZooKeeperExt} for this worker
- */
- public VertexInputSplitsCallableFactory(
- Mapper<?, ?, ?, ?>.Context context,
- GraphState<I, V, E, M> graphState,
- ImmutableClassesGiraphConfiguration<I, V, E, M> configuration,
- BspServiceWorker<I, V, E, M> bspServiceWorker,
- List<String> inputSplitPathList,
- WorkerInfo workerInfo,
- ZooKeeperExt zooKeeperExt) {
- this.context = context;
- this.graphState = graphState;
- this.configuration = configuration;
- this.bspServiceWorker = bspServiceWorker;
- this.inputSplitPathList = inputSplitPathList;
- this.workerInfo = workerInfo;
- this.zooKeeperExt = zooKeeperExt;
- }
-
- @Override
- public InputSplitsCallable<I, V, E, M> newCallable() {
- return new VertexInputSplitsCallable<I, V, E, M>(
- context,
- graphState,
- configuration,
- bspServiceWorker,
- inputSplitPathList,
- workerInfo,
- zooKeeperExt);
- }
-}
http://git-wip-us.apache.org/repos/asf/giraph/blob/1684891e/giraph-core/src/main/java/org/apache/giraph/graph/VertexMutations.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/graph/VertexMutations.java b/giraph-core/src/main/java/org/apache/giraph/graph/VertexMutations.java
index 4548cea..d9be165 100644
--- a/giraph-core/src/main/java/org/apache/giraph/graph/VertexMutations.java
+++ b/giraph-core/src/main/java/org/apache/giraph/graph/VertexMutations.java
@@ -26,6 +26,7 @@ import java.util.List;
import org.apache.giraph.conf.ImmutableClassesGiraphConfigurable;
import org.apache.giraph.conf.ImmutableClassesGiraphConfiguration;
+import org.apache.giraph.vertex.Vertex;
import org.apache.hadoop.io.Writable;
import org.apache.hadoop.io.WritableComparable;
import org.json.JSONException;
http://git-wip-us.apache.org/repos/asf/giraph/blob/1684891e/giraph-core/src/main/java/org/apache/giraph/graph/VertexOutputFormat.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/graph/VertexOutputFormat.java b/giraph-core/src/main/java/org/apache/giraph/graph/VertexOutputFormat.java
deleted file mode 100644
index d4c8e1c..0000000
--- a/giraph-core/src/main/java/org/apache/giraph/graph/VertexOutputFormat.java
+++ /dev/null
@@ -1,81 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.giraph.graph;
-
-import java.io.IOException;
-
-import org.apache.hadoop.mapreduce.JobContext;
-import org.apache.hadoop.mapreduce.OutputCommitter;
-
-import org.apache.hadoop.io.Writable;
-import org.apache.hadoop.io.WritableComparable;
-import org.apache.hadoop.mapreduce.TaskAttemptContext;
-
-/**
- * Implement to output the graph after the computation. It is modeled
- * directly after the Hadoop OutputFormat.
- *
- * @param <I> Vertex index value
- * @param <V> Vertex value
- * @param <E> Edge value
- */
-@SuppressWarnings("rawtypes")
-public abstract class VertexOutputFormat<
- I extends WritableComparable, V extends Writable, E extends Writable> {
- /**
- * Create a vertex writer for a given split. The framework will call
- * {@link VertexReader#initialize(InputSplit, TaskAttemptContext)} before
- * the split is used.
- *
- * @param context the information about the task
- * @return a new vertex writer
- * @throws IOException
- * @throws InterruptedException
- */
- public abstract VertexWriter<I, V, E> createVertexWriter(
- TaskAttemptContext context) throws IOException, InterruptedException;
-
- /**
- * Check for validity of the output-specification for the job.
- * (Copied from Hadoop OutputFormat)
- *
- * <p>This is to validate the output specification for the job when it is
- * a job is submitted. Typically checks that it does not already exist,
- * throwing an exception when it already exists, so that output is not
- * overwritten.</p>
- *
- * @param context information about the job
- * @throws IOException when output should not be attempted
- */
- public abstract void checkOutputSpecs(JobContext context)
- throws IOException, InterruptedException;
-
- /**
- * Get the output committer for this output format. This is responsible
- * for ensuring the output is committed correctly.
- * (Copied from Hadoop OutputFormat)
- *
- * @param context the task context
- * @return an output committer
- * @throws IOException
- * @throws InterruptedException
- */
- public abstract OutputCommitter getOutputCommitter(
- TaskAttemptContext context) throws IOException, InterruptedException;
-}
http://git-wip-us.apache.org/repos/asf/giraph/blob/1684891e/giraph-core/src/main/java/org/apache/giraph/graph/VertexReader.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/graph/VertexReader.java b/giraph-core/src/main/java/org/apache/giraph/graph/VertexReader.java
deleted file mode 100644
index 77801cc..0000000
--- a/giraph-core/src/main/java/org/apache/giraph/graph/VertexReader.java
+++ /dev/null
@@ -1,87 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.giraph.graph;
-
-import org.apache.hadoop.io.Writable;
-import org.apache.hadoop.io.WritableComparable;
-import org.apache.hadoop.mapreduce.InputSplit;
-import org.apache.hadoop.mapreduce.TaskAttemptContext;
-
-import java.io.IOException;
-
-/**
- * Analogous to {@link RecordReader} for vertices. Will read the vertices
- * from an input split.
- *
- * @param <I> Vertex id
- * @param <V> Vertex data
- * @param <E> Edge data
- * @param <M> Message data
- */
-@SuppressWarnings("rawtypes")
-public interface VertexReader<I extends WritableComparable,
- V extends Writable, E extends Writable, M extends Writable> {
- /**
- * Use the input split and context to setup reading the vertices.
- * Guaranteed to be called prior to any other function.
- *
- * @param inputSplit Input split to be used for reading vertices.
- * @param context Context from the task.
- * @throws IOException
- * @throws InterruptedException
- */
- void initialize(InputSplit inputSplit, TaskAttemptContext context)
- throws IOException, InterruptedException;
-
- /**
- *
- * @return false iff there are no more vertices
- * @throws IOException
- * @throws InterruptedException
- */
- boolean nextVertex() throws IOException, InterruptedException;
-
- /**
- * Get the current vertex.
- *
- * @return the current vertex which has been read.
- * nextVertex() should be called first.
- * @throws IOException
- * @throws InterruptedException
- */
- Vertex<I, V, E, M> getCurrentVertex()
- throws IOException, InterruptedException;
-
- /**
- * Close this {@link VertexReader} to future operations.
- *
- * @throws IOException
- */
- void close() throws IOException;
-
- /**
- * How much of the input has the {@link VertexReader} consumed i.e.
- * has been processed by?
- *
- * @return Progress from <code>0.0</code> to <code>1.0</code>.
- * @throws IOException
- * @throws InterruptedException
- */
- float getProgress() throws IOException, InterruptedException;
-}
http://git-wip-us.apache.org/repos/asf/giraph/blob/1684891e/giraph-core/src/main/java/org/apache/giraph/graph/VertexResolver.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/graph/VertexResolver.java b/giraph-core/src/main/java/org/apache/giraph/graph/VertexResolver.java
index 1616290..4a36706 100644
--- a/giraph-core/src/main/java/org/apache/giraph/graph/VertexResolver.java
+++ b/giraph-core/src/main/java/org/apache/giraph/graph/VertexResolver.java
@@ -18,6 +18,7 @@
package org.apache.giraph.graph;
+import org.apache.giraph.vertex.Vertex;
import org.apache.hadoop.io.Writable;
import org.apache.hadoop.io.WritableComparable;
@@ -40,8 +41,7 @@ public interface VertexResolver<I extends WritableComparable,
* excluding the normal case (a vertex already exists and has zero or more
* messages sent it to).
*
- * @param vertexId Vertex id (can be used for {@link Vertex}'s
- * initialize())
+ * @param vertexId Vertex id (can be used for {@link Vertex}'s initialize())
* @param vertex Original vertex or null if none
* @param vertexChanges Changes that happened to this vertex or null if none
* @param hasMessages True iff vertex received messages in the last superstep
http://git-wip-us.apache.org/repos/asf/giraph/blob/1684891e/giraph-core/src/main/java/org/apache/giraph/graph/VertexValueInputFormat.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/graph/VertexValueInputFormat.java b/giraph-core/src/main/java/org/apache/giraph/graph/VertexValueInputFormat.java
deleted file mode 100644
index 804d23e..0000000
--- a/giraph-core/src/main/java/org/apache/giraph/graph/VertexValueInputFormat.java
+++ /dev/null
@@ -1,58 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.giraph.graph;
-
-import org.apache.hadoop.io.Writable;
-import org.apache.hadoop.io.WritableComparable;
-import org.apache.hadoop.mapreduce.InputSplit;
-import org.apache.hadoop.mapreduce.TaskAttemptContext;
-
-import java.io.IOException;
-
-/**
- * Vertex input format that only allows setting vertex id and value. It can
- * be used in conjunction with {@link EdgeInputFormat}.
- *
- * @param <I> Vertex id
- * @param <V> Vertex value
- * @param <E> Edge value
- * @param <M> Message value
- */
-public abstract class VertexValueInputFormat<I extends WritableComparable,
- V extends Writable, E extends Writable, M extends Writable>
- extends VertexInputFormat<I, V, E, M> {
- /**
- * Create a {@link VertexValueReader} for a given split. The framework will
- * call {@link VertexValueReader#initialize(InputSplit,
- * TaskAttemptContext)} before the split is used.
- *
- * @param split The split to be read
- * @param context The information about the task
- * @return A new vertex value reader
- * @throws IOException
- */
- public abstract VertexValueReader<I, V, E, M> createVertexValueReader(
- InputSplit split, TaskAttemptContext context) throws IOException;
-
- @Override
- public final VertexReader<I, V, E, M> createVertexReader(
- InputSplit split, TaskAttemptContext context) throws IOException {
- return createVertexValueReader(split, context);
- }
-}
http://git-wip-us.apache.org/repos/asf/giraph/blob/1684891e/giraph-core/src/main/java/org/apache/giraph/graph/VertexValueReader.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/graph/VertexValueReader.java b/giraph-core/src/main/java/org/apache/giraph/graph/VertexValueReader.java
deleted file mode 100644
index 569714d..0000000
--- a/giraph-core/src/main/java/org/apache/giraph/graph/VertexValueReader.java
+++ /dev/null
@@ -1,61 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.giraph.graph;
-
-import org.apache.giraph.conf.ImmutableClassesGiraphConfiguration;
-import org.apache.hadoop.io.Writable;
-import org.apache.hadoop.io.WritableComparable;
-import org.apache.hadoop.mapreduce.InputSplit;
-import org.apache.hadoop.mapreduce.TaskAttemptContext;
-
-import java.io.IOException;
-
-/**
- * Vertex reader for {@link VertexValueInputFormat}.
- *
- * @param <I> Vertex id
- * @param <V> Vertex value
- * @param <E> Edge value
- * @param <M> Message value
- */
-public abstract class VertexValueReader<I extends WritableComparable,
- V extends Writable, E extends Writable, M extends Writable>
- extends BasicVertexValueReader<I, V, E, M> {
- /** Configuration. */
- private ImmutableClassesGiraphConfiguration<I, V, E, M> configuration;
-
- @Override
- public void initialize(InputSplit inputSplit, TaskAttemptContext context)
- throws IOException, InterruptedException {
- configuration = new ImmutableClassesGiraphConfiguration<I, V, E, M>(
- context.getConfiguration());
- }
-
- @Override
- public final Vertex<I, V, E, M> getCurrentVertex() throws IOException,
- InterruptedException {
- Vertex<I, V, E, M> vertex = getConf().createVertex();
- vertex.initialize(getCurrentVertexId(), getCurrentVertexValue());
- return vertex;
- }
-
- public ImmutableClassesGiraphConfiguration<I, V, E, M> getConf() {
- return configuration;
- }
-}
http://git-wip-us.apache.org/repos/asf/giraph/blob/1684891e/giraph-core/src/main/java/org/apache/giraph/graph/VertexWriter.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/graph/VertexWriter.java b/giraph-core/src/main/java/org/apache/giraph/graph/VertexWriter.java
deleted file mode 100644
index 7c0e06e..0000000
--- a/giraph-core/src/main/java/org/apache/giraph/graph/VertexWriter.java
+++ /dev/null
@@ -1,67 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.giraph.graph;
-
-import java.io.IOException;
-
-import org.apache.hadoop.io.Writable;
-import org.apache.hadoop.io.WritableComparable;
-import org.apache.hadoop.mapreduce.TaskAttemptContext;
-
-/**
- * Implement to output a vertex range of the graph after the computation
- *
- * @param <I> Vertex id
- * @param <V> Vertex value
- * @param <E> Edge value
- */
-@SuppressWarnings("rawtypes")
-public interface VertexWriter<I extends WritableComparable, V extends Writable,
- E extends Writable> {
- /**
- * Use the context to setup writing the vertices.
- * Guaranteed to be called prior to any other function.
- *
- * @param context Context used to write the vertices.
- * @throws IOException
- * @throws InterruptedException
- */
- void initialize(TaskAttemptContext context) throws IOException,
- InterruptedException;
-
- /**
- * Writes the next vertex and associated data
- *
- * @param vertex set the properties of this vertex
- * @throws IOException
- * @throws InterruptedException
- */
- void writeVertex(Vertex<I, V, E, ?> vertex)
- throws IOException, InterruptedException;
-
- /**
- * Close this {@link VertexWriter} to future operations.
- *
- * @param context the context of the task
- * @throws IOException
- * @throws InterruptedException
- */
- void close(TaskAttemptContext context)
- throws IOException, InterruptedException;
-}
http://git-wip-us.apache.org/repos/asf/giraph/blob/1684891e/giraph-core/src/main/java/org/apache/giraph/graph/WorkerAggregatorHandler.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/graph/WorkerAggregatorHandler.java b/giraph-core/src/main/java/org/apache/giraph/graph/WorkerAggregatorHandler.java
deleted file mode 100644
index 8e35998..0000000
--- a/giraph-core/src/main/java/org/apache/giraph/graph/WorkerAggregatorHandler.java
+++ /dev/null
@@ -1,294 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.giraph.graph;
-
-import org.apache.giraph.conf.ImmutableClassesGiraphConfiguration;
-import org.apache.giraph.bsp.CentralizedServiceWorker;
-import org.apache.giraph.comm.aggregators.WorkerAggregatorRequestProcessor;
-import org.apache.giraph.comm.aggregators.AggregatedValueOutputStream;
-import org.apache.giraph.comm.aggregators.AggregatorUtils;
-import org.apache.giraph.comm.aggregators.AllAggregatorServerData;
-import org.apache.giraph.comm.aggregators.OwnerAggregatorServerData;
-import org.apache.hadoop.io.Writable;
-import org.apache.hadoop.util.Progressable;
-import org.apache.log4j.Logger;
-
-import com.google.common.collect.Maps;
-
-import java.io.IOException;
-import java.util.Map;
-
-/**
- * Handler for aggregators on worker. Provides the aggregated values and
- * performs aggregations from user vertex code (thread-safe). Also has
- * methods for all superstep coordination related to aggregators.
- *
- * At the beginning of any superstep any worker calls prepareSuperstep(),
- * which blocks until the final aggregates from the previous superstep have
- * been delivered to the worker.
- * Next, during the superstep worker can call aggregate() and
- * getAggregatedValue() (both methods are thread safe) the former
- * computes partial aggregates for this superstep from the worker,
- * the latter returns (read-only) final aggregates from the previous superstep.
- * Finally, at the end of the superstep, the worker calls finishSuperstep(),
- * which propagates non-owned partial aggregates to the owner workers,
- * and sends the final aggregate from the owner worker to the master.
- */
-public class WorkerAggregatorHandler implements WorkerThreadAggregatorUsage {
- /** Class logger */
- private static final Logger LOG =
- Logger.getLogger(WorkerAggregatorHandler.class);
- /** Map of values from previous superstep */
- private Map<String, Writable> previousAggregatedValueMap =
- Maps.newHashMap();
- /** Map of aggregators for current superstep */
- private Map<String, Aggregator<Writable>> currentAggregatorMap =
- Maps.newHashMap();
- /** Service worker */
- private final CentralizedServiceWorker<?, ?, ?, ?> serviceWorker;
- /** Progressable for reporting progress */
- private final Progressable progressable;
- /** How big a single aggregator request can be */
- private final int maxBytesPerAggregatorRequest;
- /** Giraph configuration */
- private final ImmutableClassesGiraphConfiguration conf;
-
- /**
- * Constructor
- *
- * @param serviceWorker Service worker
- * @param conf Giraph configuration
- * @param progressable Progressable for reporting progress
- */
- public WorkerAggregatorHandler(
- CentralizedServiceWorker<?, ?, ?, ?> serviceWorker,
- ImmutableClassesGiraphConfiguration conf,
- Progressable progressable) {
- this.serviceWorker = serviceWorker;
- this.progressable = progressable;
- this.conf = conf;
- maxBytesPerAggregatorRequest = conf.getInt(
- AggregatorUtils.MAX_BYTES_PER_AGGREGATOR_REQUEST,
- AggregatorUtils.MAX_BYTES_PER_AGGREGATOR_REQUEST_DEFAULT);
- }
-
- @Override
- public <A extends Writable> void aggregate(String name, A value) {
- Aggregator<Writable> aggregator = currentAggregatorMap.get(name);
- if (aggregator != null) {
- synchronized (aggregator) {
- aggregator.aggregate(value);
- }
- } else {
- throw new IllegalStateException("aggregate: Tried to aggregate value " +
- "to unregistered aggregator " + name);
- }
- }
-
- @Override
- public <A extends Writable> A getAggregatedValue(String name) {
- return (A) previousAggregatedValueMap.get(name);
- }
-
- /**
- * Prepare aggregators for current superstep
- *
- * @param requestProcessor Request processor for aggregators
- */
- public void prepareSuperstep(
- WorkerAggregatorRequestProcessor requestProcessor) {
- if (LOG.isDebugEnabled()) {
- LOG.debug("prepareSuperstep: Start preparing aggregators");
- }
- AllAggregatorServerData allAggregatorData =
- serviceWorker.getServerData().getAllAggregatorData();
- // Wait for my aggregators
- Iterable<byte[]> dataToDistribute =
- allAggregatorData.getDataFromMasterWhenReady();
- try {
- // Distribute my aggregators
- requestProcessor.distributeAggregators(dataToDistribute);
- } catch (IOException e) {
- throw new IllegalStateException("prepareSuperstep: " +
- "IOException occurred while trying to distribute aggregators", e);
- }
- // Wait for all other aggregators and store them
- allAggregatorData.fillNextSuperstepMapsWhenReady(
- serviceWorker.getWorkerInfoList().size(), previousAggregatedValueMap,
- currentAggregatorMap);
- allAggregatorData.reset();
- if (LOG.isDebugEnabled()) {
- LOG.debug("prepareSuperstep: Aggregators prepared");
- }
- }
-
- /**
- * Send aggregators to their owners and in the end to the master
- *
- * @param requestProcessor Request processor for aggregators
- */
- public void finishSuperstep(
- WorkerAggregatorRequestProcessor requestProcessor) {
- if (LOG.isDebugEnabled()) {
- LOG.debug("finishSuperstep: Start finishing aggregators");
- }
- OwnerAggregatorServerData ownerAggregatorData =
- serviceWorker.getServerData().getOwnerAggregatorData();
- // First send partial aggregated values to their owners and determine
- // which aggregators belong to this worker
- for (Map.Entry<String, Aggregator<Writable>> entry :
- currentAggregatorMap.entrySet()) {
- try {
- boolean sent = requestProcessor.sendAggregatedValue(entry.getKey(),
- entry.getValue().getAggregatedValue());
- if (!sent) {
- // If it's my aggregator, add it directly
- ownerAggregatorData.aggregate(entry.getKey(),
- entry.getValue().getAggregatedValue());
- }
- } catch (IOException e) {
- throw new IllegalStateException("finishSuperstep: " +
- "IOException occurred while sending aggregator " +
- entry.getKey() + " to its owner", e);
- }
- progressable.progress();
- }
- try {
- // Flush
- requestProcessor.flush();
- } catch (IOException e) {
- throw new IllegalStateException("finishSuperstep: " +
- "IOException occurred while sending aggregators to owners", e);
- }
-
- // Wait to receive partial aggregated values from all other workers
- Iterable<Map.Entry<String, Writable>> myAggregators =
- ownerAggregatorData.getMyAggregatorValuesWhenReady(
- serviceWorker.getWorkerInfoList().size());
-
- // Send final aggregated values to master
- AggregatedValueOutputStream aggregatorOutput =
- new AggregatedValueOutputStream();
- for (Map.Entry<String, Writable> entry : myAggregators) {
- try {
- int currentSize = aggregatorOutput.addAggregator(entry.getKey(),
- entry.getValue());
- if (currentSize > maxBytesPerAggregatorRequest) {
- requestProcessor.sendAggregatedValuesToMaster(
- aggregatorOutput.flush());
- }
- progressable.progress();
- } catch (IOException e) {
- throw new IllegalStateException("finishSuperstep: " +
- "IOException occurred while writing aggregator " +
- entry.getKey(), e);
- }
- }
- try {
- requestProcessor.sendAggregatedValuesToMaster(aggregatorOutput.flush());
- } catch (IOException e) {
- throw new IllegalStateException("finishSuperstep: " +
- "IOException occured while sending aggregators to master", e);
- }
- // Wait for master to receive aggregated values before proceeding
- serviceWorker.getWorkerClient().waitAllRequests();
-
- ownerAggregatorData.reset();
- if (LOG.isDebugEnabled()) {
- LOG.debug("finishSuperstep: Aggregators finished");
- }
- }
-
- /**
- * Create new aggregator usage which will be used by one of the compute
- * threads.
- *
- * @return New aggregator usage
- */
- public WorkerThreadAggregatorUsage newThreadAggregatorUsage() {
- if (AggregatorUtils.useThreadLocalAggregators(conf)) {
- return new ThreadLocalWorkerAggregatorUsage();
- } else {
- return this;
- }
- }
-
- @Override
- public void finishThreadComputation() {
- // If we don't use thread-local aggregators, all the aggregated values
- // are already in this object
- }
-
- /**
- * Not thread-safe implementation of {@link WorkerThreadAggregatorUsage}.
- * We can use one instance of this object per thread to prevent
- * synchronizing on each aggregate() call. In the end of superstep,
- * values from each of these will be aggregated back to {@link
- * WorkerAggregatorHandler}
- */
- public class ThreadLocalWorkerAggregatorUsage
- implements WorkerThreadAggregatorUsage {
- /** Thread-local aggregator map */
- private final Map<String, Aggregator<Writable>> threadAggregatorMap;
-
- /**
- * Constructor
- *
- * Creates new instances of all aggregators from
- * {@link WorkerAggregatorHandler}
- */
- public ThreadLocalWorkerAggregatorUsage() {
- threadAggregatorMap = Maps.newHashMapWithExpectedSize(
- WorkerAggregatorHandler.this.currentAggregatorMap.size());
- for (Map.Entry<String, Aggregator<Writable>> entry :
- WorkerAggregatorHandler.this.currentAggregatorMap.entrySet()) {
- threadAggregatorMap.put(entry.getKey(),
- AggregatorUtils.newAggregatorInstance(
- (Class<Aggregator<Writable>>) entry.getValue().getClass()));
- }
- }
-
- @Override
- public <A extends Writable> void aggregate(String name, A value) {
- Aggregator<Writable> aggregator = threadAggregatorMap.get(name);
- if (aggregator != null) {
- aggregator.aggregate(value);
- } else {
- throw new IllegalStateException("aggregate: " +
- "Tried to aggregate value to unregistered aggregator " + name);
- }
- }
-
- @Override
- public <A extends Writable> A getAggregatedValue(String name) {
- return WorkerAggregatorHandler.this.<A>getAggregatedValue(name);
- }
-
- @Override
- public void finishThreadComputation() {
- // Aggregate the values this thread's vertices provided back to
- // WorkerAggregatorHandler
- for (Map.Entry<String, Aggregator<Writable>> entry :
- threadAggregatorMap.entrySet()) {
- WorkerAggregatorHandler.this.aggregate(entry.getKey(),
- entry.getValue().getAggregatedValue());
- }
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/giraph/blob/1684891e/giraph-core/src/main/java/org/apache/giraph/graph/WorkerAggregatorUsage.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/graph/WorkerAggregatorUsage.java b/giraph-core/src/main/java/org/apache/giraph/graph/WorkerAggregatorUsage.java
deleted file mode 100644
index b7b98cd..0000000
--- a/giraph-core/src/main/java/org/apache/giraph/graph/WorkerAggregatorUsage.java
+++ /dev/null
@@ -1,44 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.giraph.graph;
-
-import org.apache.hadoop.io.Writable;
-
-/**
- * Vertex classes can access and change aggregators through this interface
- */
-public interface WorkerAggregatorUsage {
- /**
- * Add a new value
- *
- * @param name Name of aggregator
- * @param value Value to add
- * @param <A> Aggregated value
- */
- <A extends Writable> void aggregate(String name, A value);
-
- /**
- * Get value of an aggregator.
- *
- * @param name Name of aggregator
- * @param <A> Aggregated value
- * @return Value of the aggregator
- */
- <A extends Writable> A getAggregatedValue(String name);
-}
http://git-wip-us.apache.org/repos/asf/giraph/blob/1684891e/giraph-core/src/main/java/org/apache/giraph/graph/WorkerContext.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/graph/WorkerContext.java b/giraph-core/src/main/java/org/apache/giraph/graph/WorkerContext.java
deleted file mode 100644
index 5f5f6fa..0000000
--- a/giraph-core/src/main/java/org/apache/giraph/graph/WorkerContext.java
+++ /dev/null
@@ -1,121 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.giraph.graph;
-
-import org.apache.hadoop.io.Writable;
-import org.apache.hadoop.mapreduce.Mapper;
-
-/**
- * WorkerContext allows for the execution of user code
- * on a per-worker basis. There's one WorkerContext per worker.
- */
-@SuppressWarnings("rawtypes")
-public abstract class WorkerContext implements WorkerAggregatorUsage {
- /** Global graph state */
- private GraphState graphState;
-
- /**
- * Set the graph state.
- *
- * @param graphState Used to set the graph state.
- */
- public void setGraphState(GraphState graphState) {
- this.graphState = graphState;
- }
-
- /**
- * Initialize the WorkerContext.
- * This method is executed once on each Worker before the first
- * superstep starts.
- *
- * @throws IllegalAccessException Thrown for getting the class
- * @throws InstantiationException Expected instantiation in this method.
- */
- public abstract void preApplication() throws InstantiationException,
- IllegalAccessException;
-
- /**
- * Finalize the WorkerContext.
- * This method is executed once on each Worker after the last
- * superstep ends.
- */
- public abstract void postApplication();
-
- /**
- * Execute user code.
- * This method is executed once on each Worker before each
- * superstep starts.
- */
- public abstract void preSuperstep();
-
- /**
- * Execute user code.
- * This method is executed once on each Worker after each
- * superstep ends.
- */
- public abstract void postSuperstep();
-
- /**
- * Retrieves the current superstep.
- *
- * @return Current superstep
- */
- public long getSuperstep() {
- return graphState.getSuperstep();
- }
-
- /**
- * Get the total (all workers) number of vertices that
- * existed in the previous superstep.
- *
- * @return Total number of vertices (-1 if first superstep)
- */
- public long getTotalNumVertices() {
- return graphState.getTotalNumVertices();
- }
-
- /**
- * Get the total (all workers) number of edges that
- * existed in the previous superstep.
- *
- * @return Total number of edges (-1 if first superstep)
- */
- public long getTotalNumEdges() {
- return graphState.getTotalNumEdges();
- }
-
- /**
- * Get the mapper context
- *
- * @return Mapper context
- */
- public Mapper.Context getContext() {
- return graphState.getContext();
- }
-
- @Override
- public <A extends Writable> void aggregate(String name, A value) {
- graphState.getWorkerAggregatorUsage().aggregate(name, value);
- }
-
- @Override
- public <A extends Writable> A getAggregatedValue(String name) {
- return graphState.getWorkerAggregatorUsage().<A>getAggregatedValue(name);
- }
-}
http://git-wip-us.apache.org/repos/asf/giraph/blob/1684891e/giraph-core/src/main/java/org/apache/giraph/graph/WorkerInfo.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/graph/WorkerInfo.java b/giraph-core/src/main/java/org/apache/giraph/graph/WorkerInfo.java
deleted file mode 100644
index c9571ba..0000000
--- a/giraph-core/src/main/java/org/apache/giraph/graph/WorkerInfo.java
+++ /dev/null
@@ -1,35 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.giraph.graph;
-
-/**
- * Information about a worker that is sent to the master and other workers.
- */
-public class WorkerInfo extends TaskInfo {
- /**
- * Constructor for reflection
- */
- public WorkerInfo() {
- }
-
- @Override
- public String toString() {
- return "Worker(" + super.toString() + ")";
- }
-}
http://git-wip-us.apache.org/repos/asf/giraph/blob/1684891e/giraph-core/src/main/java/org/apache/giraph/graph/WorkerThreadAggregatorUsage.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/graph/WorkerThreadAggregatorUsage.java b/giraph-core/src/main/java/org/apache/giraph/graph/WorkerThreadAggregatorUsage.java
deleted file mode 100644
index e36ff48..0000000
--- a/giraph-core/src/main/java/org/apache/giraph/graph/WorkerThreadAggregatorUsage.java
+++ /dev/null
@@ -1,31 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.giraph.graph;
-
-/**
- * {@link WorkerAggregatorUsage} which can be used in each of the
- * computation threads.
- */
-public interface WorkerThreadAggregatorUsage extends WorkerAggregatorUsage {
- /**
- * Call this after thread's computation is finished,
- * i.e. when all vertices have provided their values to aggregators
- */
- void finishThreadComputation();
-}
http://git-wip-us.apache.org/repos/asf/giraph/blob/1684891e/giraph-core/src/main/java/org/apache/giraph/graph/partition/BasicPartitionOwner.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/graph/partition/BasicPartitionOwner.java b/giraph-core/src/main/java/org/apache/giraph/graph/partition/BasicPartitionOwner.java
deleted file mode 100644
index e063ac7..0000000
--- a/giraph-core/src/main/java/org/apache/giraph/graph/partition/BasicPartitionOwner.java
+++ /dev/null
@@ -1,163 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.giraph.graph.partition;
-
-import java.io.DataInput;
-import java.io.DataOutput;
-import java.io.IOException;
-
-import org.apache.giraph.conf.ImmutableClassesGiraphConfigurable;
-import org.apache.giraph.conf.ImmutableClassesGiraphConfiguration;
-import org.apache.giraph.graph.WorkerInfo;
-
-/**
- * Basic partition owner, can be subclassed for more complicated partition
- * owner implementations.
- */
-public class BasicPartitionOwner implements PartitionOwner,
- ImmutableClassesGiraphConfigurable {
- /** Configuration */
- private ImmutableClassesGiraphConfiguration conf;
- /** Partition id */
- private int partitionId = -1;
- /** Owning worker information */
- private WorkerInfo workerInfo;
- /** Previous (if any) worker info */
- private WorkerInfo previousWorkerInfo;
- /** Checkpoint files prefix for this partition */
- private String checkpointFilesPrefix;
-
- /**
- * Default constructor.
- */
- public BasicPartitionOwner() { }
-
- /**
- * Constructor with partition id and worker info.
- *
- * @param partitionId Partition id of this partition.
- * @param workerInfo Owner of the partition.
- */
- public BasicPartitionOwner(int partitionId, WorkerInfo workerInfo) {
- this(partitionId, workerInfo, null, null);
- }
-
- /**
- * Constructor with partition id and worker info.
- *
- * @param partitionId Partition id of this partition.
- * @param workerInfo Owner of the partition.
- * @param previousWorkerInfo Previous owner of this partition.
- * @param checkpointFilesPrefix Prefix of the checkpoint files.
- */
- public BasicPartitionOwner(int partitionId,
- WorkerInfo workerInfo,
- WorkerInfo previousWorkerInfo,
- String checkpointFilesPrefix) {
- this.partitionId = partitionId;
- this.workerInfo = workerInfo;
- this.previousWorkerInfo = previousWorkerInfo;
- this.checkpointFilesPrefix = checkpointFilesPrefix;
- }
-
- @Override
- public int getPartitionId() {
- return partitionId;
- }
-
- @Override
- public WorkerInfo getWorkerInfo() {
- return workerInfo;
- }
-
- @Override
- public void setWorkerInfo(WorkerInfo workerInfo) {
- this.workerInfo = workerInfo;
- }
-
- @Override
- public WorkerInfo getPreviousWorkerInfo() {
- return previousWorkerInfo;
- }
-
- @Override
- public void setPreviousWorkerInfo(WorkerInfo workerInfo) {
- this.previousWorkerInfo = workerInfo;
- }
-
- @Override
- public String getCheckpointFilesPrefix() {
- return checkpointFilesPrefix;
- }
-
- @Override
- public void setCheckpointFilesPrefix(String checkpointFilesPrefix) {
- this.checkpointFilesPrefix = checkpointFilesPrefix;
- }
-
- @Override
- public void readFields(DataInput input) throws IOException {
- partitionId = input.readInt();
- workerInfo = new WorkerInfo();
- workerInfo.readFields(input);
- boolean hasPreviousWorkerInfo = input.readBoolean();
- if (hasPreviousWorkerInfo) {
- previousWorkerInfo = new WorkerInfo();
- previousWorkerInfo.readFields(input);
- }
- boolean hasCheckpointFilePrefix = input.readBoolean();
- if (hasCheckpointFilePrefix) {
- checkpointFilesPrefix = input.readUTF();
- }
- }
-
- @Override
- public void write(DataOutput output) throws IOException {
- output.writeInt(partitionId);
- workerInfo.write(output);
- if (previousWorkerInfo != null) {
- output.writeBoolean(true);
- previousWorkerInfo.write(output);
- } else {
- output.writeBoolean(false);
- }
- if (checkpointFilesPrefix != null) {
- output.writeBoolean(true);
- output.writeUTF(checkpointFilesPrefix);
- } else {
- output.writeBoolean(false);
- }
- }
-
- @Override
- public ImmutableClassesGiraphConfiguration getConf() {
- return conf;
- }
-
- @Override
- public void setConf(ImmutableClassesGiraphConfiguration conf) {
- this.conf = conf;
- }
-
- @Override
- public String toString() {
- return "(id=" + partitionId + ",cur=" + workerInfo + ",prev=" +
- previousWorkerInfo + ",ckpt_file=" + checkpointFilesPrefix + ")";
- }
-}
http://git-wip-us.apache.org/repos/asf/giraph/blob/1684891e/giraph-core/src/main/java/org/apache/giraph/graph/partition/ByteArrayPartition.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/graph/partition/ByteArrayPartition.java b/giraph-core/src/main/java/org/apache/giraph/graph/partition/ByteArrayPartition.java
deleted file mode 100644
index 5460a55..0000000
--- a/giraph-core/src/main/java/org/apache/giraph/graph/partition/ByteArrayPartition.java
+++ /dev/null
@@ -1,277 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.giraph.graph.partition;
-
-import com.google.common.collect.MapMaker;
-
-import com.google.common.primitives.Ints;
-import java.io.DataInput;
-import java.io.DataOutput;
-import java.io.IOException;
-import java.util.Iterator;
-import java.util.Map;
-import java.util.concurrent.ConcurrentMap;
-
-import org.apache.giraph.conf.ImmutableClassesGiraphConfiguration;
-import org.apache.giraph.graph.Vertex;
-import org.apache.giraph.utils.UnsafeByteArrayInputStream;
-import org.apache.giraph.utils.WritableUtils;
-import org.apache.hadoop.io.Writable;
-import org.apache.hadoop.io.WritableComparable;
-import org.apache.hadoop.util.Progressable;
-import org.apache.log4j.Logger;
-
-/**
- * Byte array based partition. Should reduce the amount of memory used since
- * the entire graph is compressed into byte arrays. Must guarantee, however,
- * that only one thread at a time will call getVertex since it is a singleton.
- *
- * @param <I> Vertex index value
- * @param <V> Vertex value
- * @param <E> Edge value
- * @param <M> Message data
- */
-public class ByteArrayPartition<I extends WritableComparable,
- V extends Writable, E extends Writable, M extends Writable>
- implements Partition<I, V, E, M> {
- /** Class logger */
- private static final Logger LOG = Logger.getLogger(ByteArrayPartition.class);
- /** Configuration from the worker */
- private ImmutableClassesGiraphConfiguration<I, V, E, M> conf;
- /** Partition id */
- private int id;
- /**
- * Vertex map for this range (keyed by index). Note that the byte[] is a
- * serialized vertex with the first four bytes as the length of the vertex
- * to read.
- */
- private ConcurrentMap<I, byte[]> vertexMap;
- /** Context used to report progress */
- private Progressable progressable;
- /** Representative vertex */
- private Vertex<I, V, E, M> representativeVertex;
- /** Use unsafe serialization */
- private boolean useUnsafeSerialization;
-
- /**
- * Constructor for reflection.
- */
- public ByteArrayPartition() { }
-
- @Override
- public void initialize(int partitionId, Progressable progressable) {
- setId(partitionId);
- setProgressable(progressable);
- vertexMap = new MapMaker().concurrencyLevel(
- conf.getNettyServerExecutionConcurrency()).makeMap();
- representativeVertex = conf.createVertex();
- useUnsafeSerialization = conf.useUnsafeSerialization();
- }
-
- @Override
- public Vertex<I, V, E, M> getVertex(I vertexIndex) {
- byte[] vertexData = vertexMap.get(vertexIndex);
- if (vertexData == null) {
- return null;
- }
- WritableUtils.readFieldsFromByteArrayWithSize(
- vertexData, representativeVertex, useUnsafeSerialization);
- return representativeVertex;
- }
-
- @Override
- public Vertex<I, V, E, M> putVertex(Vertex<I, V, E, M> vertex) {
- byte[] vertexData =
- WritableUtils.writeToByteArrayWithSize(vertex, useUnsafeSerialization);
- byte[] oldVertexBytes = vertexMap.put(vertex.getId(), vertexData);
- if (oldVertexBytes == null) {
- return null;
- } else {
- WritableUtils.readFieldsFromByteArrayWithSize(
- oldVertexBytes, representativeVertex, useUnsafeSerialization);
- return representativeVertex;
- }
- }
-
- @Override
- public Vertex<I, V, E, M> removeVertex(I vertexIndex) {
- byte[] vertexBytes = vertexMap.remove(vertexIndex);
- if (vertexBytes == null) {
- return null;
- }
- WritableUtils.readFieldsFromByteArrayWithSize(vertexBytes,
- representativeVertex, useUnsafeSerialization);
- return representativeVertex;
- }
-
- @Override
- public void addPartition(Partition<I, V, E, M> partition) {
- // Only work with other ByteArrayPartition instances
- if (!(partition instanceof ByteArrayPartition)) {
- throw new IllegalStateException("addPartition: Cannot add partition " +
- "of type " + partition.getClass());
- }
-
- ByteArrayPartition<I, V, E, M> byteArrayPartition =
- (ByteArrayPartition<I, V, E, M>) partition;
- for (Map.Entry<I, byte[]> entry :
- byteArrayPartition.vertexMap.entrySet()) {
- vertexMap.put(entry.getKey(), entry.getValue());
- }
- }
-
- @Override
- public long getVertexCount() {
- return vertexMap.size();
- }
-
- @Override
- public long getEdgeCount() {
- long edges = 0;
- for (byte[] vertexBytes : vertexMap.values()) {
- WritableUtils.readFieldsFromByteArrayWithSize(vertexBytes,
- representativeVertex, useUnsafeSerialization);
- edges += representativeVertex.getNumEdges();
- }
- return edges;
- }
-
- @Override
- public int getId() {
- return id;
- }
-
- @Override
- public void setId(int id) {
- this.id = id;
- }
-
- @Override
- public void setProgressable(Progressable progressable) {
- this.progressable = progressable;
- }
-
- @Override
- public void saveVertex(Vertex<I, V, E, M> vertex) {
- // Reuse the old buffer whenever possible
- byte[] oldVertexData = vertexMap.get(vertex.getId());
- if (oldVertexData != null) {
- vertexMap.put(vertex.getId(),
- WritableUtils.writeToByteArrayWithSize(
- vertex, oldVertexData, useUnsafeSerialization));
- } else {
- vertexMap.put(vertex.getId(),
- WritableUtils.writeToByteArrayWithSize(
- vertex, useUnsafeSerialization));
- }
- }
-
- @Override
- public void write(DataOutput output) throws IOException {
- output.writeInt(id);
- output.writeInt(vertexMap.size());
- for (Map.Entry<I, byte[]> entry : vertexMap.entrySet()) {
- if (progressable != null) {
- progressable.progress();
- }
- entry.getKey().write(output);
- // Note here that we are writing the size of the vertex data first
- // as it is encoded in the first four bytes of the byte[]
- int vertexDataSize;
- if (useUnsafeSerialization) {
- vertexDataSize = UnsafeByteArrayInputStream.getInt(entry.getValue(),
- 0);
- } else {
- vertexDataSize = Ints.fromByteArray(entry.getValue());
- }
-
- output.writeInt(vertexDataSize);
- output.write(entry.getValue(), 0, vertexDataSize);
- }
- }
-
- @Override
- public void readFields(DataInput input) throws IOException {
- id = input.readInt();
- int size = input.readInt();
- vertexMap = new MapMaker().concurrencyLevel(
- conf.getNettyServerExecutionConcurrency()).initialCapacity(
- size).makeMap();
- representativeVertex = conf.createVertex();
- useUnsafeSerialization = conf.useUnsafeSerialization();
- for (int i = 0; i < size; ++i) {
- if (progressable != null) {
- progressable.progress();
- }
- I vertexId = conf.createVertexId();
- vertexId.readFields(input);
- int vertexDataSize = input.readInt();
- byte[] vertexData = new byte[vertexDataSize];
- input.readFully(vertexData);
- if (vertexMap.put(vertexId, vertexData) != null) {
- throw new IllegalStateException("readFields: Already saw vertex " +
- vertexId);
- }
- }
- }
-
- @Override
- public void setConf(
- ImmutableClassesGiraphConfiguration<I, V, E, M> configuration) {
- conf = configuration;
- }
-
- @Override
- public ImmutableClassesGiraphConfiguration<I, V, E, M> getConf() {
- return conf;
- }
-
- @Override
- public Iterator<Vertex<I, V, E, M>> iterator() {
- return new RepresentativeVertexIterator();
- }
-
- /**
- * Iterator that deserializes a vertex from a byte array on the fly, using
- * the same representative vertex object.
- */
- private class RepresentativeVertexIterator implements
- Iterator<Vertex<I, V, E, M>> {
- /** Iterator to the vertex values */
- private Iterator<byte[]> vertexDataIterator =
- vertexMap.values().iterator();
-
- @Override
- public boolean hasNext() {
- return vertexDataIterator.hasNext();
- }
-
- @Override
- public Vertex<I, V, E, M> next() {
- WritableUtils.readFieldsFromByteArrayWithSize(
- vertexDataIterator.next(), representativeVertex,
- useUnsafeSerialization);
- return representativeVertex;
- }
-
- @Override
- public void remove() {
- throw new IllegalAccessError("remove: This method is not supported.");
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/giraph/blob/1684891e/giraph-core/src/main/java/org/apache/giraph/graph/partition/DiskBackedPartitionStore.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/graph/partition/DiskBackedPartitionStore.java b/giraph-core/src/main/java/org/apache/giraph/graph/partition/DiskBackedPartitionStore.java
deleted file mode 100644
index 1e483d3..0000000
--- a/giraph-core/src/main/java/org/apache/giraph/graph/partition/DiskBackedPartitionStore.java
+++ /dev/null
@@ -1,364 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.giraph.graph.partition;
-
-import org.apache.giraph.conf.GiraphConstants;
-import org.apache.giraph.conf.ImmutableClassesGiraphConfiguration;
-import org.apache.giraph.graph.Vertex;
-import org.apache.hadoop.io.Writable;
-import org.apache.hadoop.io.WritableComparable;
-import org.apache.hadoop.mapreduce.Mapper;
-import org.apache.log4j.Logger;
-
-import com.google.common.collect.Iterables;
-import com.google.common.collect.Maps;
-
-import java.io.BufferedInputStream;
-import java.io.BufferedOutputStream;
-import java.io.DataInputStream;
-import java.io.DataOutputStream;
-import java.io.File;
-import java.io.FileInputStream;
-import java.io.FileOutputStream;
-import java.io.IOException;
-import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.ConcurrentMap;
-import java.util.concurrent.locks.Lock;
-import java.util.concurrent.locks.ReentrantLock;
-
-/**
- * A partition store that can possibly spill to disk.
- *
- * @param <I> Vertex id
- * @param <V> Vertex data
- * @param <E> Edge data
- * @param <M> Message data
- */
-public class DiskBackedPartitionStore<I extends WritableComparable,
- V extends Writable, E extends Writable, M extends Writable>
- extends PartitionStore<I, V, E, M> {
- /** Class logger. */
- private static final Logger LOG =
- Logger.getLogger(DiskBackedPartitionStore.class);
- /** Map of partitions kept in memory. */
- private final ConcurrentMap<Integer, Partition<I, V, E, M>>
- inMemoryPartitions = new ConcurrentHashMap<Integer, Partition<I, V, E, M>>();
- /** Maximum number of partitions to keep in memory. */
- private int maxInMemoryPartitions;
- /** Map of partitions kept out-of-core. The values are partition sizes. */
- private final ConcurrentMap<Integer, Integer> onDiskPartitions =
- Maps.newConcurrentMap();
- /** Directory on the local file system for storing out-of-core partitions. */
- private final String basePath;
- /** Configuration. */
- private final ImmutableClassesGiraphConfiguration<I, V, E, M> conf;
- /** Slot for loading out-of-core partitions. */
- private Partition<I, V, E, M> loadedPartition;
- /** Locks for accessing and modifying partitions. */
- private final ConcurrentMap<Integer, Lock> partitionLocks =
- Maps.newConcurrentMap();
- /** Context used to report progress */
- private final Mapper<?, ?, ?, ?>.Context context;
-
- /**
- * Constructor.
- *
- * @param conf Configuration
- * @param context Mapper context
- */
- public DiskBackedPartitionStore(
- ImmutableClassesGiraphConfiguration<I, V, E, M> conf,
- Mapper<?, ?, ?, ?>.Context context) {
- this.conf = conf;
- this.context = context;
- // We must be able to hold at least one partition in memory
- maxInMemoryPartitions = Math.max(1,
- conf.getInt(GiraphConstants.MAX_PARTITIONS_IN_MEMORY,
- GiraphConstants.MAX_PARTITIONS_IN_MEMORY_DEFAULT));
- basePath = conf.get("mapred.job.id", "Unknown Job") +
- conf.get(GiraphConstants.PARTITIONS_DIRECTORY,
- GiraphConstants.PARTITIONS_DIRECTORY_DEFAULT);
- }
-
- /**
- * Get the path to the file where a partition is stored.
- *
- * @param partitionId The partition
- * @return The path to the given partition
- */
- private String getPartitionPath(Integer partitionId) {
- return basePath + "/partition-" + partitionId;
- }
-
- /**
- * Create a new lock for a partition, lock it, and return it. If already
- * existing, return null.
- *
- * @param partitionId Partition id
- * @return A newly created lock, or null if already present
- */
- private Lock createLock(Integer partitionId) {
- Lock lock = new ReentrantLock(true);
- lock.lock();
- if (partitionLocks.putIfAbsent(partitionId, lock) != null) {
- return null;
- }
- return lock;
- }
-
- /**
- * Get the lock for a partition id.
- *
- * @param partitionId Partition id
- * @return The lock
- */
- private Lock getLock(Integer partitionId) {
- return partitionLocks.get(partitionId);
- }
-
- /**
- * Write a partition to disk.
- *
- * @param partition The partition object to write
- * @throws java.io.IOException
- */
- private void writePartition(Partition<I, V, E, M> partition)
- throws IOException {
- File file = new File(getPartitionPath(partition.getId()));
- file.getParentFile().mkdirs();
- file.createNewFile();
- DataOutputStream outputStream = new DataOutputStream(
- new BufferedOutputStream(new FileOutputStream(file)));
- for (Vertex<I, V, E, M> vertex : partition) {
- vertex.write(outputStream);
- }
- outputStream.close();
- }
-
- /**
- * Read a partition from disk.
- *
- * @param partitionId Id of the partition to read
- * @return The partition object
- * @throws IOException
- */
- private Partition<I, V, E, M> readPartition(Integer partitionId)
- throws IOException {
- Partition<I, V, E, M> partition =
- conf.createPartition(partitionId, context);
- File file = new File(getPartitionPath(partitionId));
- DataInputStream inputStream = new DataInputStream(
- new BufferedInputStream(new FileInputStream(file)));
- int numVertices = onDiskPartitions.get(partitionId);
- for (int i = 0; i < numVertices; ++i) {
- Vertex<I, V, E, M> vertex = conf.createVertex();
- vertex.readFields(inputStream);
- partition.putVertex(vertex);
- }
- inputStream.close();
- file.delete();
- return partition;
- }
-
- /**
- * Append some vertices of another partition to an out-of-core partition.
- *
- * @param partition Partition to add
- * @throws IOException
- */
- private void appendPartitionOutOfCore(Partition<I, V, E, M> partition)
- throws IOException {
- File file = new File(getPartitionPath(partition.getId()));
- DataOutputStream outputStream = new DataOutputStream(
- new BufferedOutputStream(new FileOutputStream(file, true)));
- for (Vertex<I, V, E, M> vertex : partition) {
- vertex.write(outputStream);
- }
- outputStream.close();
- }
-
- /**
- * Load an out-of-core partition in memory.
- *
- * @param partitionId Partition id
- */
- private void loadPartition(Integer partitionId) {
- if (loadedPartition != null) {
- if (loadedPartition.getId() == partitionId) {
- return;
- }
- if (LOG.isInfoEnabled()) {
- LOG.info("loadPartition: moving partition " + loadedPartition.getId() +
- " out of core with size " + loadedPartition.getVertexCount());
- }
- try {
- writePartition(loadedPartition);
- onDiskPartitions.put(loadedPartition.getId(),
- (int) loadedPartition.getVertexCount());
- loadedPartition = null;
- } catch (IOException e) {
- throw new IllegalStateException("loadPartition: failed writing " +
- "partition " + loadedPartition.getId() + " to disk", e);
- }
- }
- if (LOG.isInfoEnabled()) {
- LOG.info("loadPartition: loading partition " + partitionId +
- " in memory");
- }
- try {
- loadedPartition = readPartition(partitionId);
- } catch (IOException e) {
- throw new IllegalStateException("loadPartition: failed reading " +
- "partition " + partitionId + " from disk");
- }
- }
-
- /**
- * Add a new partition without requiring a lock.
- *
- * @param partition Partition to be added
- */
- private void addPartitionNoLock(Partition<I, V, E, M> partition) {
- synchronized (inMemoryPartitions) {
- if (inMemoryPartitions.size() + 1 < maxInMemoryPartitions) {
- inMemoryPartitions.put(partition.getId(), partition);
-
- return;
- }
- }
- try {
- writePartition(partition);
- onDiskPartitions.put(partition.getId(),
- (int) partition.getVertexCount());
- } catch (IOException e) {
- throw new IllegalStateException("addPartition: failed writing " +
- "partition " + partition.getId() + "to disk");
- }
- }
-
- @Override
- public void addPartition(Partition<I, V, E, M> partition) {
- if (inMemoryPartitions.containsKey(partition.getId())) {
- Partition<I, V, E, M> existingPartition =
- inMemoryPartitions.get(partition.getId());
- existingPartition.addPartition(partition);
- } else if (onDiskPartitions.containsKey(partition.getId())) {
- Lock lock = getLock(partition.getId());
- lock.lock();
- if (loadedPartition != null && loadedPartition.getId() ==
- partition.getId()) {
- loadedPartition.addPartition(partition);
- } else {
- try {
- appendPartitionOutOfCore(partition);
- onDiskPartitions.put(partition.getId(),
- onDiskPartitions.get(partition.getId()) +
- (int) partition.getVertexCount());
- } catch (IOException e) {
- throw new IllegalStateException("addPartition: failed " +
- "writing vertices to partition " + partition.getId() + " on disk",
- e);
- }
- }
- lock.unlock();
- } else {
- Lock lock = createLock(partition.getId());
- if (lock != null) {
- addPartitionNoLock(partition);
- lock.unlock();
- } else {
- // Another thread is already creating the partition,
- // so we make sure it's done before repeating the call.
- lock = getLock(partition.getId());
- lock.lock();
- lock.unlock();
- addPartition(partition);
- }
- }
- }
-
- @Override
- public Partition<I, V, E, M> getPartition(Integer partitionId) {
- if (inMemoryPartitions.containsKey(partitionId)) {
- return inMemoryPartitions.get(partitionId);
- } else if (onDiskPartitions.containsKey(partitionId)) {
- loadPartition(partitionId);
- return loadedPartition;
- } else {
- throw new IllegalStateException("getPartition: partition " +
- partitionId + " does not exist");
- }
- }
-
- @Override
- public Partition<I, V, E, M> removePartition(Integer partitionId) {
- partitionLocks.remove(partitionId);
- if (onDiskPartitions.containsKey(partitionId)) {
- Partition<I, V, E, M> partition;
- if (loadedPartition != null && loadedPartition.getId() == partitionId) {
- partition = loadedPartition;
- loadedPartition = null;
- } else {
- try {
- partition = readPartition(partitionId);
- } catch (IOException e) {
- throw new IllegalStateException("removePartition: failed reading " +
- "partition " + partitionId + " from disk", e);
- }
- }
- onDiskPartitions.remove(partitionId);
- return partition;
- } else {
- return inMemoryPartitions.remove(partitionId);
- }
- }
-
- @Override
- public void deletePartition(Integer partitionId) {
- partitionLocks.remove(partitionId);
- if (inMemoryPartitions.containsKey(partitionId)) {
- inMemoryPartitions.remove(partitionId);
- } else {
- if (loadedPartition != null && loadedPartition.getId() == partitionId) {
- loadedPartition = null;
- } else {
- File file = new File(getPartitionPath(partitionId));
- file.delete();
- }
- onDiskPartitions.remove(partitionId);
- }
- }
-
- @Override
- public boolean hasPartition(Integer partitionId) {
- return partitionLocks.containsKey(partitionId);
- }
-
- @Override
- public Iterable<Integer> getPartitionIds() {
- return Iterables.concat(inMemoryPartitions.keySet(),
- onDiskPartitions.keySet());
- }
-
- @Override
- public int getNumPartitions() {
- return partitionLocks.size();
- }
-
-}
http://git-wip-us.apache.org/repos/asf/giraph/blob/1684891e/giraph-core/src/main/java/org/apache/giraph/graph/partition/GraphPartitionerFactory.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/graph/partition/GraphPartitionerFactory.java b/giraph-core/src/main/java/org/apache/giraph/graph/partition/GraphPartitionerFactory.java
deleted file mode 100644
index a7ac84b..0000000
--- a/giraph-core/src/main/java/org/apache/giraph/graph/partition/GraphPartitionerFactory.java
+++ /dev/null
@@ -1,52 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.giraph.graph.partition;
-
-import org.apache.giraph.conf.ImmutableClassesGiraphConfigurable;
-import org.apache.hadoop.io.Writable;
-import org.apache.hadoop.io.WritableComparable;
-
-/**
- * Defines the partitioning framework for this application.
- *
- * @param <I> Vertex index value
- * @param <V> Vertex value
- * @param <E> Edge value
- * @param <M> Message value
- */
-@SuppressWarnings("rawtypes")
-public interface GraphPartitionerFactory<I extends WritableComparable,
- V extends Writable, E extends Writable, M extends Writable> extends
- ImmutableClassesGiraphConfigurable {
- /**
- * Create the {@link MasterGraphPartitioner} used by the master.
- * Instantiated once by the master and reused.
- *
- * @return Instantiated master graph partitioner
- */
- MasterGraphPartitioner<I, V, E, M> createMasterGraphPartitioner();
-
- /**
- * Create the {@link WorkerGraphPartitioner} used by the worker.
- * Instantiated once by every worker and reused.
- *
- * @return Instantiated worker graph partitioner
- */
- WorkerGraphPartitioner<I, V, E, M> createWorkerGraphPartitioner();
-}
http://git-wip-us.apache.org/repos/asf/giraph/blob/1684891e/giraph-core/src/main/java/org/apache/giraph/graph/partition/HashMasterPartitioner.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/graph/partition/HashMasterPartitioner.java b/giraph-core/src/main/java/org/apache/giraph/graph/partition/HashMasterPartitioner.java
deleted file mode 100644
index 6724e50..0000000
--- a/giraph-core/src/main/java/org/apache/giraph/graph/partition/HashMasterPartitioner.java
+++ /dev/null
@@ -1,157 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.giraph.graph.partition;
-
-import java.util.ArrayList;
-import java.util.Collection;
-import java.util.Iterator;
-import java.util.List;
-
-import org.apache.giraph.conf.ImmutableClassesGiraphConfiguration;
-import org.apache.giraph.graph.WorkerInfo;
-import org.apache.hadoop.io.Writable;
-import org.apache.hadoop.io.WritableComparable;
-import org.apache.log4j.Logger;
-
-/**
- * Master will execute a hash based partitioning.
- *
- * @param <I> Vertex index value
- * @param <V> Vertex value
- * @param <E> Edge value
- * @param <M> Message value
- */
-@SuppressWarnings("rawtypes")
-public class HashMasterPartitioner<I extends WritableComparable,
- V extends Writable, E extends Writable, M extends Writable> implements
- MasterGraphPartitioner<I, V, E, M> {
- /** Multiplier for the current workers squared */
- public static final String PARTITION_COUNT_MULTIPLIER =
- "hash.masterPartitionCountMultipler";
- /** Default mulitplier for current workers squared */
- public static final float DEFAULT_PARTITION_COUNT_MULTIPLIER = 1.0f;
- /** Overrides default partition count calculation if not -1 */
- public static final String USER_PARTITION_COUNT =
- "hash.userPartitionCount";
- /** Default user partition count */
- public static final int DEFAULT_USER_PARTITION_COUNT = -1;
- /** Class logger */
- private static Logger LOG = Logger.getLogger(HashMasterPartitioner.class);
- /**
- * ZooKeeper has a limit of the data in a single znode of 1 MB and
- * each entry can go be on the average somewhat more than 300 bytes
- */
- private static final int MAX_PARTTIONS = 1024 * 1024 / 350;
- /** Provided configuration */
- private ImmutableClassesGiraphConfiguration conf;
- /** Specified partition count (overrides calculation) */
- private final int userPartitionCount;
- /** Partition count (calculated in createInitialPartitionOwners) */
- private int partitionCount = -1;
- /** Save the last generated partition owner list */
- private List<PartitionOwner> partitionOwnerList;
-
- /**
- * Constructor.
- *
- *@param conf Configuration used.
- */
- public HashMasterPartitioner(ImmutableClassesGiraphConfiguration conf) {
- this.conf = conf;
- userPartitionCount = conf.getInt(USER_PARTITION_COUNT,
- DEFAULT_USER_PARTITION_COUNT);
- }
-
- @Override
- public Collection<PartitionOwner> createInitialPartitionOwners(
- Collection<WorkerInfo> availableWorkerInfos, int maxWorkers) {
- if (availableWorkerInfos.isEmpty()) {
- throw new IllegalArgumentException(
- "createInitialPartitionOwners: No available workers");
- }
- List<PartitionOwner> ownerList = new ArrayList<PartitionOwner>();
- Iterator<WorkerInfo> workerIt = availableWorkerInfos.iterator();
- if (userPartitionCount == DEFAULT_USER_PARTITION_COUNT) {
- float multiplier = conf.getFloat(
- PARTITION_COUNT_MULTIPLIER,
- DEFAULT_PARTITION_COUNT_MULTIPLIER);
- partitionCount =
- Math.max((int) (multiplier * availableWorkerInfos.size() *
- availableWorkerInfos.size()),
- 1);
- } else {
- partitionCount = userPartitionCount;
- }
- if (LOG.isInfoEnabled()) {
- LOG.info("createInitialPartitionOwners: Creating " +
- partitionCount + ", default would have been " +
- (availableWorkerInfos.size() *
- availableWorkerInfos.size()) + " partitions.");
- }
- if (partitionCount > MAX_PARTTIONS) {
- LOG.warn("createInitialPartitionOwners: " +
- "Reducing the partitionCount to " + MAX_PARTTIONS +
- " from " + partitionCount);
- partitionCount = MAX_PARTTIONS;
- }
-
- for (int i = 0; i < partitionCount; ++i) {
- PartitionOwner owner = new BasicPartitionOwner(i, workerIt.next());
- if (!workerIt.hasNext()) {
- workerIt = availableWorkerInfos.iterator();
- }
- ownerList.add(owner);
- }
- this.partitionOwnerList = ownerList;
- return ownerList;
- }
-
- @Override
- public Collection<PartitionOwner> getCurrentPartitionOwners() {
- return partitionOwnerList;
- }
-
- /**
- * Subclasses can set the partition owner list.
- *
- * @param partitionOwnerList New partition owner list.
- */
- protected void setPartitionOwnerList(List<PartitionOwner>
- partitionOwnerList) {
- this.partitionOwnerList = partitionOwnerList;
- }
-
- @Override
- public Collection<PartitionOwner> generateChangedPartitionOwners(
- Collection<PartitionStats> allPartitionStatsList,
- Collection<WorkerInfo> availableWorkerInfos,
- int maxWorkers,
- long superstep) {
- return PartitionBalancer.balancePartitionsAcrossWorkers(
- conf,
- partitionOwnerList,
- allPartitionStatsList,
- availableWorkerInfos);
- }
-
- @Override
- public PartitionStats createPartitionStats() {
- return new PartitionStats();
- }
-}