You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hama.apache.org by tj...@apache.org on 2013/02/21 07:38:36 UTC
svn commit: r1448523 [1/4] - in /hama/trunk: ./
core/src/main/java/org/apache/hama/ core/src/main/java/org/apache/hama/bsp/
core/src/main/java/org/apache/hama/bsp/ft/
core/src/main/java/org/apache/hama/bsp/message/compress/
core/src/main/java/org/apach...
Author: tjungblut
Date: Thu Feb 21 06:38:33 2013
New Revision: 1448523
URL: http://svn.apache.org/r1448523
Log:
[HAMA-735]: Tighten the graph API
Removed:
hama/trunk/examples/src/test/java/org/apache/hama/examples/PageRankTest.java
Modified:
hama/trunk/CHANGES.txt
hama/trunk/core/src/main/java/org/apache/hama/HamaConfiguration.java
hama/trunk/core/src/main/java/org/apache/hama/HamaVersionAnnotation.java
hama/trunk/core/src/main/java/org/apache/hama/bsp/BSP.java
hama/trunk/core/src/main/java/org/apache/hama/bsp/BSPInterface.java
hama/trunk/core/src/main/java/org/apache/hama/bsp/BSPJob.java
hama/trunk/core/src/main/java/org/apache/hama/bsp/BSPJobClient.java
hama/trunk/core/src/main/java/org/apache/hama/bsp/BSPJobID.java
hama/trunk/core/src/main/java/org/apache/hama/bsp/BSPMaster.java
hama/trunk/core/src/main/java/org/apache/hama/bsp/BSPPeerImpl.java
hama/trunk/core/src/main/java/org/apache/hama/bsp/BSPTask.java
hama/trunk/core/src/main/java/org/apache/hama/bsp/CombineFileInputFormat.java
hama/trunk/core/src/main/java/org/apache/hama/bsp/CombineFileSplit.java
hama/trunk/core/src/main/java/org/apache/hama/bsp/Counters.java
hama/trunk/core/src/main/java/org/apache/hama/bsp/Directive.java
hama/trunk/core/src/main/java/org/apache/hama/bsp/FileInputFormat.java
hama/trunk/core/src/main/java/org/apache/hama/bsp/FileOutputFormat.java
hama/trunk/core/src/main/java/org/apache/hama/bsp/GroomServer.java
hama/trunk/core/src/main/java/org/apache/hama/bsp/GroomServerAction.java
hama/trunk/core/src/main/java/org/apache/hama/bsp/GroomServerStatus.java
hama/trunk/core/src/main/java/org/apache/hama/bsp/JobInProgress.java
hama/trunk/core/src/main/java/org/apache/hama/bsp/JobInProgressListener.java
hama/trunk/core/src/main/java/org/apache/hama/bsp/PartitioningRunner.java
hama/trunk/core/src/main/java/org/apache/hama/bsp/QueueManager.java
hama/trunk/core/src/main/java/org/apache/hama/bsp/RecoverTaskAction.java
hama/trunk/core/src/main/java/org/apache/hama/bsp/SimpleTaskScheduler.java
hama/trunk/core/src/main/java/org/apache/hama/bsp/TaskCompletionEvent.java
hama/trunk/core/src/main/java/org/apache/hama/bsp/TaskInProgress.java
hama/trunk/core/src/main/java/org/apache/hama/bsp/TaskLog.java
hama/trunk/core/src/main/java/org/apache/hama/bsp/TaskRunner.java
hama/trunk/core/src/main/java/org/apache/hama/bsp/TextArrayWritable.java
hama/trunk/core/src/main/java/org/apache/hama/bsp/UpdatePeerAction.java
hama/trunk/core/src/main/java/org/apache/hama/bsp/ft/AsyncRcvdMsgCheckpointImpl.java
hama/trunk/core/src/main/java/org/apache/hama/bsp/ft/BSPFaultTolerantService.java
hama/trunk/core/src/main/java/org/apache/hama/bsp/message/compress/BSPCompressedBundle.java
hama/trunk/core/src/main/java/org/apache/hama/bsp/message/compress/BSPMessageCompressor.java
hama/trunk/core/src/main/java/org/apache/hama/bsp/message/io/BufferedReadStatus.java
hama/trunk/core/src/main/java/org/apache/hama/bsp/message/io/ByteBufferInputStream.java
hama/trunk/core/src/main/java/org/apache/hama/bsp/message/io/ByteBufferOutputStream.java
hama/trunk/core/src/main/java/org/apache/hama/bsp/message/io/CombineSpilledDataProcessor.java
hama/trunk/core/src/main/java/org/apache/hama/bsp/message/io/DirectByteBufferOutputStream.java
hama/trunk/core/src/main/java/org/apache/hama/bsp/message/io/DualChannelByteBufferStream.java
hama/trunk/core/src/main/java/org/apache/hama/bsp/message/io/DuplexByteArrayChannel.java
hama/trunk/core/src/main/java/org/apache/hama/bsp/message/io/PreFetchCache.java
hama/trunk/core/src/main/java/org/apache/hama/bsp/message/io/ReadIndexStatus.java
hama/trunk/core/src/main/java/org/apache/hama/bsp/message/io/SpilledByteBuffer.java
hama/trunk/core/src/main/java/org/apache/hama/bsp/message/io/SpilledDataReadStatus.java
hama/trunk/core/src/main/java/org/apache/hama/bsp/message/io/SpillingDataOutputBuffer.java
hama/trunk/core/src/main/java/org/apache/hama/bsp/message/io/SyncFlushByteBufferOutputStream.java
hama/trunk/core/src/main/java/org/apache/hama/bsp/message/io/SyncReadByteBufferInputStream.java
hama/trunk/core/src/main/java/org/apache/hama/bsp/message/io/WriteSpilledDataProcessor.java
hama/trunk/core/src/main/java/org/apache/hama/bsp/message/queue/DiskQueue.java
hama/trunk/core/src/main/java/org/apache/hama/bsp/message/queue/MemoryQueue.java
hama/trunk/core/src/main/java/org/apache/hama/bsp/message/queue/MessageQueue.java
hama/trunk/core/src/main/java/org/apache/hama/bsp/message/queue/MessageTransferQueue.java
hama/trunk/core/src/main/java/org/apache/hama/bsp/message/queue/SynchronizedQueue.java
hama/trunk/core/src/main/java/org/apache/hama/bsp/sync/BSPMasterSyncClient.java
hama/trunk/core/src/main/java/org/apache/hama/bsp/sync/BSPPeerSyncClient.java
hama/trunk/core/src/main/java/org/apache/hama/bsp/sync/MasterSyncClient.java
hama/trunk/core/src/main/java/org/apache/hama/bsp/sync/PeerSyncClient.java
hama/trunk/core/src/main/java/org/apache/hama/bsp/sync/SyncClient.java
hama/trunk/core/src/main/java/org/apache/hama/bsp/sync/SyncEvent.java
hama/trunk/core/src/main/java/org/apache/hama/bsp/sync/SyncEventListener.java
hama/trunk/core/src/main/java/org/apache/hama/bsp/sync/SyncException.java
hama/trunk/core/src/main/java/org/apache/hama/bsp/sync/SyncServerRunner.java
hama/trunk/core/src/main/java/org/apache/hama/bsp/sync/SyncServiceFactory.java
hama/trunk/core/src/main/java/org/apache/hama/bsp/sync/ZKSyncClient.java
hama/trunk/core/src/main/java/org/apache/hama/bsp/sync/ZKSyncEventFactory.java
hama/trunk/core/src/main/java/org/apache/hama/bsp/sync/ZKSyncEventListener.java
hama/trunk/core/src/main/java/org/apache/hama/bsp/sync/ZooKeeperSyncClientImpl.java
hama/trunk/core/src/main/java/org/apache/hama/bsp/sync/ZooKeeperSyncServerImpl.java
hama/trunk/core/src/main/java/org/apache/hama/bsp/taskallocation/BestEffortDataLocalTaskAllocator.java
hama/trunk/core/src/main/java/org/apache/hama/bsp/taskallocation/RawSplitResource.java
hama/trunk/core/src/main/java/org/apache/hama/http/HttpServer.java
hama/trunk/core/src/main/java/org/apache/hama/ipc/BSPPeerProtocol.java
hama/trunk/core/src/main/java/org/apache/hama/ipc/GroomProtocol.java
hama/trunk/core/src/main/java/org/apache/hama/ipc/MasterProtocol.java
hama/trunk/core/src/main/java/org/apache/hama/monitor/Configurator.java
hama/trunk/core/src/main/java/org/apache/hama/monitor/MetricsRecord.java
hama/trunk/core/src/main/java/org/apache/hama/monitor/MetricsTag.java
hama/trunk/core/src/main/java/org/apache/hama/monitor/Monitor.java
hama/trunk/core/src/main/java/org/apache/hama/monitor/MonitorListener.java
hama/trunk/core/src/main/java/org/apache/hama/monitor/ZKCollector.java
hama/trunk/core/src/main/java/org/apache/hama/monitor/fd/NodeEventListener.java
hama/trunk/core/src/main/java/org/apache/hama/monitor/fd/NodeStatus.java
hama/trunk/core/src/main/java/org/apache/hama/monitor/fd/Sensor.java
hama/trunk/core/src/main/java/org/apache/hama/monitor/fd/Supervisor.java
hama/trunk/core/src/main/java/org/apache/hama/monitor/fd/UDPSensor.java
hama/trunk/core/src/main/java/org/apache/hama/pipes/BinaryProtocol.java
hama/trunk/core/src/main/java/org/apache/hama/pipes/Submitter.java
hama/trunk/core/src/main/java/org/apache/hama/util/BSPNetUtils.java
hama/trunk/core/src/main/java/org/apache/hama/util/BSPServletUtil.java
hama/trunk/core/src/main/java/org/apache/hama/util/Bytes.java
hama/trunk/core/src/main/java/org/apache/hama/util/ClusterUtil.java
hama/trunk/core/src/main/java/org/apache/hama/util/LRUCache.java
hama/trunk/core/src/main/java/org/apache/hama/util/RandomVariable.java
hama/trunk/core/src/main/java/org/apache/hama/util/ReflectionUtils.java
hama/trunk/core/src/main/java/org/apache/hama/util/RunJar.java
hama/trunk/core/src/main/java/org/apache/hama/util/StringArrayWritable.java
hama/trunk/core/src/main/java/org/apache/hama/util/VersionInfo.java
hama/trunk/core/src/main/java/org/apache/hama/zookeeper/ZKServerTool.java
hama/trunk/core/src/test/java/org/apache/hama/HamaCluster.java
hama/trunk/core/src/test/java/org/apache/hama/HamaClusterTestCase.java
hama/trunk/core/src/test/java/org/apache/hama/HamaTestCase.java
hama/trunk/core/src/test/java/org/apache/hama/MiniBSPCluster.java
hama/trunk/core/src/test/java/org/apache/hama/MiniZooKeeperCluster.java
hama/trunk/core/src/test/java/org/apache/hama/bsp/TestBSPMasterGroomServer.java
hama/trunk/core/src/test/java/org/apache/hama/bsp/TestClusterStatus.java
hama/trunk/core/src/test/java/org/apache/hama/bsp/TestFileInputFormat.java
hama/trunk/core/src/test/java/org/apache/hama/bsp/TestLocalRunner.java
hama/trunk/core/src/test/java/org/apache/hama/bsp/TestPartitioning.java
hama/trunk/core/src/test/java/org/apache/hama/bsp/TestZooKeeper.java
hama/trunk/core/src/test/java/org/apache/hama/bsp/message/TestSpillingQueue.java
hama/trunk/core/src/test/java/org/apache/hama/bsp/message/compress/TestBSPMessageCompressor.java
hama/trunk/core/src/test/java/org/apache/hama/examples/ClassSerializePrinting.java
hama/trunk/core/src/test/java/org/apache/hama/ipc/TestIPC.java
hama/trunk/core/src/test/java/org/apache/hama/ipc/TestRPC.java
hama/trunk/core/src/test/java/org/apache/hama/monitor/TestConfigurator.java
hama/trunk/core/src/test/java/org/apache/hama/monitor/fd/TestFD.java
hama/trunk/core/src/test/java/org/apache/hama/util/TestBytes.java
hama/trunk/core/src/test/java/org/apache/hama/util/TestRandomVariable.java
hama/trunk/core/src/test/java/org/apache/hama/zookeeper/TestZKTools.java
hama/trunk/examples/src/main/java/org/apache/hama/examples/BipartiteMatching.java
hama/trunk/examples/src/main/java/org/apache/hama/examples/ExampleDriver.java
hama/trunk/examples/src/main/java/org/apache/hama/examples/GradientDescentExample.java
hama/trunk/examples/src/main/java/org/apache/hama/examples/InlinkCount.java
hama/trunk/examples/src/main/java/org/apache/hama/examples/Kmeans.java
hama/trunk/examples/src/main/java/org/apache/hama/examples/MindistSearch.java
hama/trunk/examples/src/main/java/org/apache/hama/examples/PageRank.java
hama/trunk/examples/src/main/java/org/apache/hama/examples/SSSP.java
hama/trunk/examples/src/main/java/org/apache/hama/examples/SpMV.java
hama/trunk/examples/src/main/java/org/apache/hama/examples/util/DenseVectorWritable.java
hama/trunk/examples/src/main/java/org/apache/hama/examples/util/Generator.java
hama/trunk/examples/src/main/java/org/apache/hama/examples/util/SparseVectorWritable.java
hama/trunk/examples/src/main/java/org/apache/hama/examples/util/SymmetricMatrixGen.java
hama/trunk/examples/src/test/java/org/apache/hama/examples/CombineExampleTest.java
hama/trunk/examples/src/test/java/org/apache/hama/examples/LinearRegressionTest.java
hama/trunk/examples/src/test/java/org/apache/hama/examples/LogisticRegressionTest.java
hama/trunk/examples/src/test/java/org/apache/hama/examples/MindistSearchTest.java
hama/trunk/examples/src/test/java/org/apache/hama/examples/PiEstimatorTest.java
hama/trunk/examples/src/test/java/org/apache/hama/examples/RandBenchTest.java
hama/trunk/examples/src/test/java/org/apache/hama/examples/SSSPTest.java
hama/trunk/examples/src/test/java/org/apache/hama/examples/SymmetricMatrixGenTest.java
hama/trunk/graph/src/main/java/org/apache/hama/graph/AbstractAggregator.java
hama/trunk/graph/src/main/java/org/apache/hama/graph/AggregationRunner.java
hama/trunk/graph/src/main/java/org/apache/hama/graph/Edge.java
hama/trunk/graph/src/main/java/org/apache/hama/graph/GraphJob.java
hama/trunk/graph/src/main/java/org/apache/hama/graph/GraphJobMessage.java
hama/trunk/graph/src/main/java/org/apache/hama/graph/GraphJobRunner.java
hama/trunk/graph/src/main/java/org/apache/hama/graph/Vertex.java
hama/trunk/graph/src/main/java/org/apache/hama/graph/VertexInputReader.java
hama/trunk/graph/src/main/java/org/apache/hama/graph/VertexInterface.java
hama/trunk/graph/src/main/java/org/apache/hama/graph/VerticesInfo.java
hama/trunk/graph/src/test/java/org/apache/hama/graph/TestSubmitGraphJob.java
hama/trunk/graph/src/test/java/org/apache/hama/graph/example/PageRank.java
Modified: hama/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/hama/trunk/CHANGES.txt?rev=1448523&r1=1448522&r2=1448523&view=diff
==============================================================================
--- hama/trunk/CHANGES.txt (original)
+++ hama/trunk/CHANGES.txt Thu Feb 21 06:38:33 2013
@@ -18,6 +18,7 @@ Release 0.7 (unreleased changes)
IMPROVEMENTS
+ HAMA-735: Tighten the graph API (tjungblut)
HAMA-714: Align format consistency between examples and generators (edwardyoon)
HAMA-531: Reimplementation of partitioner (edwardyoon)
Modified: hama/trunk/core/src/main/java/org/apache/hama/HamaConfiguration.java
URL: http://svn.apache.org/viewvc/hama/trunk/core/src/main/java/org/apache/hama/HamaConfiguration.java?rev=1448523&r1=1448522&r2=1448523&view=diff
==============================================================================
--- hama/trunk/core/src/main/java/org/apache/hama/HamaConfiguration.java (original)
+++ hama/trunk/core/src/main/java/org/apache/hama/HamaConfiguration.java Thu Feb 21 06:38:33 2013
@@ -38,7 +38,7 @@ public class HamaConfiguration extends C
super();
this.addResource(confFile);
}
-
+
/**
* Create a clone of passed configuration.
*
Modified: hama/trunk/core/src/main/java/org/apache/hama/HamaVersionAnnotation.java
URL: http://svn.apache.org/viewvc/hama/trunk/core/src/main/java/org/apache/hama/HamaVersionAnnotation.java?rev=1448523&r1=1448522&r2=1448523&view=diff
==============================================================================
--- hama/trunk/core/src/main/java/org/apache/hama/HamaVersionAnnotation.java (original)
+++ hama/trunk/core/src/main/java/org/apache/hama/HamaVersionAnnotation.java Thu Feb 21 06:38:33 2013
@@ -23,18 +23,17 @@ import java.lang.annotation.RetentionPol
import java.lang.annotation.Target;
/**
- * An attribute that
- * holds the information on version and build information of a
+ * An attribute that holds the information on version and build information of a
* Hama package that a potential user is working with.
*/
@Retention(RetentionPolicy.RUNTIME)
@Target(ElementType.PACKAGE)
-
public @interface HamaVersionAnnotation {
/**
* Get the Hama version
+ *
* @return the version string "0.3.3-dev"
*/
String version();
@@ -46,6 +45,7 @@ public @interface HamaVersionAnnotation
/**
* Get the date when Hama was compiled.
+ *
* @return the date in unix 'date' format
*/
String date();
@@ -57,23 +57,23 @@ public @interface HamaVersionAnnotation
/**
* Get the subversion revision.
+ *
* @return the revision number as a string (eg. "451451")
*/
String revision();
/**
* Get the branch from which this was compiled.
+ *
* @return The branch name, e.g. "trunk" or "branches/branch-0.20"
*/
String branch();
/**
- * Get a checksum of the source files from which
- * Hama was compiled.
+ * Get a checksum of the source files from which Hama was compiled.
+ *
* @return a string that uniquely identifies the source
**/
- String srcChecksum();
-
-
+ String srcChecksum();
}
Modified: hama/trunk/core/src/main/java/org/apache/hama/bsp/BSP.java
URL: http://svn.apache.org/viewvc/hama/trunk/core/src/main/java/org/apache/hama/bsp/BSP.java?rev=1448523&r1=1448522&r2=1448523&view=diff
==============================================================================
--- hama/trunk/core/src/main/java/org/apache/hama/bsp/BSP.java (original)
+++ hama/trunk/core/src/main/java/org/apache/hama/bsp/BSP.java Thu Feb 21 06:38:33 2013
@@ -28,27 +28,27 @@ import org.apache.hama.bsp.sync.SyncExce
public abstract class BSP<K1, V1, K2, V2, M extends Writable> implements
BSPInterface<K1, V1, K2, V2, M> {
- /**
- * {@inheritDoc}
- */
- @Override
- public abstract void bsp(BSPPeer<K1, V1, K2, V2, M> peer) throws IOException,
+ /**
+ * {@inheritDoc}
+ */
+ @Override
+ public abstract void bsp(BSPPeer<K1, V1, K2, V2, M> peer) throws IOException,
SyncException, InterruptedException;
- /**
- * {@inheritDoc}
- */
- @Override
- public void setup(BSPPeer<K1, V1, K2, V2, M> peer) throws IOException,
+ /**
+ * {@inheritDoc}
+ */
+ @Override
+ public void setup(BSPPeer<K1, V1, K2, V2, M> peer) throws IOException,
SyncException, InterruptedException {
}
- /**
- * {@inheritDoc}
- */
- @Override
- public void cleanup(BSPPeer<K1, V1, K2, V2, M> peer) throws IOException {
+ /**
+ * {@inheritDoc}
+ */
+ @Override
+ public void cleanup(BSPPeer<K1, V1, K2, V2, M> peer) throws IOException {
}
Modified: hama/trunk/core/src/main/java/org/apache/hama/bsp/BSPInterface.java
URL: http://svn.apache.org/viewvc/hama/trunk/core/src/main/java/org/apache/hama/bsp/BSPInterface.java?rev=1448523&r1=1448522&r2=1448523&view=diff
==============================================================================
--- hama/trunk/core/src/main/java/org/apache/hama/bsp/BSPInterface.java (original)
+++ hama/trunk/core/src/main/java/org/apache/hama/bsp/BSPInterface.java Thu Feb 21 06:38:33 2013
@@ -23,42 +23,42 @@ import org.apache.hadoop.io.Writable;
import org.apache.hama.bsp.sync.SyncException;
/**
- * The {@link BSPInterface} defines the basic operations needed to implement a BSP
- * based algorithm.
- * The implementing algorithm takes {@link BSPPeer}s as parameters which are
- * responsible for communication, reading K1-V1 inputs, collecting k2-V2 outputs
- * and exchanging messages of type M.
+ * The {@link BSPInterface} defines the basic operations needed to implement a
+ * BSP based algorithm. The implementing algorithm takes {@link BSPPeer}s as
+ * parameters which are responsible for communication, reading K1-V1 inputs,
+ * collecting k2-V2 outputs and exchanging messages of type M.
*/
public interface BSPInterface<K1, V1, K2, V2, M extends Writable> {
- /**
- * This method is your computation method, the main work of your BSP should be
- * done here.
- *
- * @param peer Your BSPPeer instance.
- * @throws java.io.IOException
- * @throws org.apache.hama.bsp.sync.SyncException
- * @throws InterruptedException
- */
- public void bsp(BSPPeer<K1, V1, K2, V2, M> peer) throws IOException, SyncException, InterruptedException;
+ /**
+ * This method is your computation method, the main work of your BSP should be
+ * done here.
+ *
+ * @param peer Your BSPPeer instance.
+ * @throws java.io.IOException
+ * @throws org.apache.hama.bsp.sync.SyncException
+ * @throws InterruptedException
+ */
+ public void bsp(BSPPeer<K1, V1, K2, V2, M> peer) throws IOException,
+ SyncException, InterruptedException;
- /**
- * This method is called before the BSP method. It can be used for setup
- * purposes.
- *
- * @param peer Your BSPPeer instance.
- * @throws IOException
- */
- public void setup(BSPPeer<K1, V1, K2, V2, M> peer) throws IOException,
- SyncException, InterruptedException;
+ /**
+ * This method is called before the BSP method. It can be used for setup
+ * purposes.
+ *
+ * @param peer Your BSPPeer instance.
+ * @throws IOException
+ */
+ public void setup(BSPPeer<K1, V1, K2, V2, M> peer) throws IOException,
+ SyncException, InterruptedException;
- /**
- * This method is called after the BSP method. It can be used for cleanup
- * purposes. Cleanup is guranteed to be called after the BSP runs, even in
- * case of exceptions.
- *
- * @param peer Your BSPPeer instance.
- * @throws IOException
- */
- public void cleanup(BSPPeer<K1, V1, K2, V2, M> peer) throws IOException;
+ /**
+ * This method is called after the BSP method. It can be used for cleanup
+ * purposes. Cleanup is guranteed to be called after the BSP runs, even in
+ * case of exceptions.
+ *
+ * @param peer Your BSPPeer instance.
+ * @throws IOException
+ */
+ public void cleanup(BSPPeer<K1, V1, K2, V2, M> peer) throws IOException;
}
Modified: hama/trunk/core/src/main/java/org/apache/hama/bsp/BSPJob.java
URL: http://svn.apache.org/viewvc/hama/trunk/core/src/main/java/org/apache/hama/bsp/BSPJob.java?rev=1448523&r1=1448522&r2=1448523&view=diff
==============================================================================
--- hama/trunk/core/src/main/java/org/apache/hama/bsp/BSPJob.java (original)
+++ hama/trunk/core/src/main/java/org/apache/hama/bsp/BSPJob.java Thu Feb 21 06:38:33 2013
@@ -252,8 +252,9 @@ public class BSPJob extends BSPJobContex
@SuppressWarnings({ "rawtypes" })
public InputFormat getInputFormat() {
- return ReflectionUtils.newInstance(conf.getClass(Constants.INPUT_FORMAT_CLASS,
- TextInputFormat.class, InputFormat.class), conf);
+ return ReflectionUtils.newInstance(
+ conf.getClass(Constants.INPUT_FORMAT_CLASS, TextInputFormat.class,
+ InputFormat.class), conf);
}
@SuppressWarnings({ "rawtypes" })
@@ -374,19 +375,22 @@ public class BSPJob extends BSPJobContex
*/
@SuppressWarnings("rawtypes")
public void setPartitioner(Class<? extends Partitioner> theClass) {
- conf.setClass(Constants.RUNTIME_PARTITIONING_CLASS, theClass, Partitioner.class);
+ conf.setClass(Constants.RUNTIME_PARTITIONING_CLASS, theClass,
+ Partitioner.class);
}
@SuppressWarnings("rawtypes")
public Partitioner getPartitioner() {
- return ReflectionUtils.newInstance(
- conf.getClass(Constants.RUNTIME_PARTITIONING_CLASS, HashPartitioner.class,Partitioner.class), conf);
+ return ReflectionUtils.newInstance(conf.getClass(
+ Constants.RUNTIME_PARTITIONING_CLASS, HashPartitioner.class,
+ Partitioner.class), conf);
}
@SuppressWarnings("rawtypes")
public OutputFormat getOutputFormat() {
- return ReflectionUtils.newInstance(conf.getClass(Constants.OUTPUT_FORMAT_CLASS,
- TextOutputFormat.class, OutputFormat.class), conf);
+ return ReflectionUtils.newInstance(conf.getClass(
+ Constants.OUTPUT_FORMAT_CLASS, TextOutputFormat.class,
+ OutputFormat.class), conf);
}
protected void setCheckPointInterval(int checkPointInterval) {
Modified: hama/trunk/core/src/main/java/org/apache/hama/bsp/BSPJobClient.java
URL: http://svn.apache.org/viewvc/hama/trunk/core/src/main/java/org/apache/hama/bsp/BSPJobClient.java?rev=1448523&r1=1448522&r2=1448523&view=diff
==============================================================================
--- hama/trunk/core/src/main/java/org/apache/hama/bsp/BSPJobClient.java (original)
+++ hama/trunk/core/src/main/java/org/apache/hama/bsp/BSPJobClient.java Thu Feb 21 06:38:33 2013
@@ -375,13 +375,13 @@ public class BSPJobClient extends Config
if (fs.isFile(inputDir)) {
inputDir = inputDir.getParent();
}
-
+
Path partitionDir = new Path(inputDir + "/partitions");
if (fs.exists(partitionDir)) {
fs.delete(partitionDir, true);
}
-
+
if (job.get("bsp.partitioning.runner.job") != null) {
return job;
}// Early exit for the partitioner job.
@@ -477,7 +477,9 @@ public class BSPJobClient extends Config
if (maxTasks < job.getNumBspTask()) {
throw new IOException(
- "Job failed! The number of tasks has exceeded the maximum allowed.");
+ "Job failed! The number of tasks has exceeded the maximum allowed. Maxtasks: "
+ + maxTasks + " < configured number of tasks: "
+ + job.getNumBspTask());
}
return maxTasks;
}
@@ -560,7 +562,8 @@ public class BSPJobClient extends Config
&& job.getConfiguration().get(Constants.RUNTIME_PARTITIONING_CLASS) != null
&& job.get("bsp.partitioning.runner.job") == null) {
LOG.debug(((FileSplit) split).getPath().getName());
- String[] extractPartitionID = ((FileSplit) split).getPath().getName().split("[-]");
+ String[] extractPartitionID = ((FileSplit) split).getPath().getName()
+ .split("[-]");
rawSplit.setPartitionID(Integer.parseInt(extractPartitionID[1]));
}
@@ -1122,9 +1125,9 @@ public class BSPJobClient extends Config
out.writeInt(partitionID);
bytes.write(out);
WritableUtils.writeVInt(out, locations.length);
- for (String location : locations) {
- Text.writeString(out, location);
- }
+ for (String location : locations) {
+ Text.writeString(out, location);
+ }
}
public long getDataLength() {
Modified: hama/trunk/core/src/main/java/org/apache/hama/bsp/BSPJobID.java
URL: http://svn.apache.org/viewvc/hama/trunk/core/src/main/java/org/apache/hama/bsp/BSPJobID.java?rev=1448523&r1=1448522&r2=1448523&view=diff
==============================================================================
--- hama/trunk/core/src/main/java/org/apache/hama/bsp/BSPJobID.java (original)
+++ hama/trunk/core/src/main/java/org/apache/hama/bsp/BSPJobID.java Thu Feb 21 06:38:33 2013
@@ -18,7 +18,6 @@
package org.apache.hama.bsp;
import java.io.DataInput;
-
import java.io.DataOutput;
import java.io.IOException;
import java.text.NumberFormat;
Modified: hama/trunk/core/src/main/java/org/apache/hama/bsp/BSPMaster.java
URL: http://svn.apache.org/viewvc/hama/trunk/core/src/main/java/org/apache/hama/bsp/BSPMaster.java?rev=1448523&r1=1448522&r2=1448523&view=diff
==============================================================================
--- hama/trunk/core/src/main/java/org/apache/hama/bsp/BSPMaster.java (original)
+++ hama/trunk/core/src/main/java/org/apache/hama/bsp/BSPMaster.java Thu Feb 21 06:38:33 2013
@@ -65,15 +65,15 @@ import org.apache.zookeeper.Watcher;
* BSPMaster is responsible to control all the groom servers and to manage bsp
* jobs. It has the following responsibilities:
* <ol>
- * <li> <b>Job submission</b>. BSPMaster is responsible for accepting new job
+ * <li><b>Job submission</b>. BSPMaster is responsible for accepting new job
* requests and assigning the job to scheduler for scheduling BSP Tasks defined
* for the job.
- * <li> <b>GroomServer monitoring</b> BSPMaster keeps track of all the groom
- * servers in the cluster. It is responsible for adding new grooms to the
- * cluster and keeping a tab on all the grooms and could blacklist a groom if
- * it get fails the availability requirement.
- * <li> BSPMaster keeps track of all the task status for each job and handles
- * the failure of job as requested by the jobs.
+ * <li><b>GroomServer monitoring</b> BSPMaster keeps track of all the groom
+ * servers in the cluster. It is responsible for adding new grooms to the
+ * cluster and keeping a tab on all the grooms and could blacklist a groom if it
+ * get fails the availability requirement.
+ * <li>BSPMaster keeps track of all the task status for each job and handles the
+ * failure of job as requested by the jobs.
* </ol>
*/
public class BSPMaster implements JobSubmissionProtocol, MasterProtocol,
@@ -128,7 +128,8 @@ public class BSPMaster implements JobSub
// Jobs' Meta Data
private Integer nextJobId = 1;
- private int totalSubmissions = 0; // how many jobs has been submitted by clients
+ private int totalSubmissions = 0; // how many jobs has been submitted by
+ // clients
private int totalTasks = 0; // currnetly running tasks
private int totalTaskCapacity; // max tasks that groom server can run
@@ -148,11 +149,10 @@ public class BSPMaster implements JobSub
private final AtomicReference<Supervisor> supervisor = new AtomicReference<Supervisor>();
/**
- * ReportGroomStatusHandler keeps track of the status reported by each
- * Groomservers on the task they are executing currently. Based on the
- * status reported, it is responsible for issuing task recovery requests,
- * updating the job progress and other book keeping on currently running
- * jobs.
+ * ReportGroomStatusHandler keeps track of the status reported by each
+ * Groomservers on the task they are executing currently. Based on the status
+ * reported, it is responsible for issuing task recovery requests, updating
+ * the job progress and other book keeping on currently running jobs.
*/
private class ReportGroomStatusHandler implements DirectiveHandler {
@@ -190,10 +190,9 @@ public class BSPMaster implements JobSub
jip.getStatus().setProgress(ts.getSuperstepCount());
jip.getStatus().setSuperstepCount(ts.getSuperstepCount());
} else if (ts.getRunState() == TaskStatus.State.FAILED) {
- if(jip.handleFailure(tip)){
+ if (jip.handleFailure(tip)) {
recoverTask(jip);
- }
- else {
+ } else {
jip.status.setRunState(JobStatus.FAILED);
jip.failedTask(tip, ts);
}
@@ -210,7 +209,7 @@ public class BSPMaster implements JobSub
jip.getStatus().setProgress(ts.getSuperstepCount());
jip.getStatus().setSuperstepCount(ts.getSuperstepCount());
} else if (jip.getStatus().getRunState() == JobStatus.KILLED) {
-
+
GroomProtocol worker = findGroomServer(tmpStatus);
Directive d1 = new DispatchTasksDirective(
new GroomServerAction[] { new KillTaskAction(ts.getTaskId()) });
@@ -439,17 +438,17 @@ public class BSPMaster implements JobSub
void deleteLocalFiles() throws IOException {
String[] localDirs = getLocalDirs();
- for (String localDir : localDirs) {
- FileSystem.getLocal(conf).delete(new Path(localDir), true);
- }
+ for (String localDir : localDirs) {
+ FileSystem.getLocal(conf).delete(new Path(localDir), true);
+ }
}
void deleteLocalFiles(String subdir) throws IOException {
try {
String[] localDirs = getLocalDirs();
- for (String localDir : localDirs) {
- FileSystem.getLocal(conf).delete(new Path(localDir, subdir), true);
- }
+ for (String localDir : localDirs) {
+ FileSystem.getLocal(conf).delete(new Path(localDir, subdir), true);
+ }
} catch (NullPointerException e) {
LOG.info(e);
}
@@ -465,6 +464,7 @@ public class BSPMaster implements JobSub
/**
* Starts the BSP Master process.
+ *
* @param conf The Hama configuration.
* @return an instance of BSPMaster
* @throws IOException
@@ -477,6 +477,7 @@ public class BSPMaster implements JobSub
/**
* Starts the BSP Master process
+ *
* @param conf The Hama configuration
* @param identifier Identifier for the job.
* @return
@@ -500,6 +501,7 @@ public class BSPMaster implements JobSub
/**
* Initialize the global synchronization client.
+ *
* @param conf Hama configuration.
*/
private void initZK(HamaConfiguration conf) {
@@ -509,9 +511,10 @@ public class BSPMaster implements JobSub
/**
* Get a handle of the global synchronization client used by BSPMaster.
+ *
* @return The synchronization client.
*/
- public MasterSyncClient getSyncClient(){
+ public MasterSyncClient getSyncClient() {
return this.syncClient;
}
@@ -604,9 +607,9 @@ public class BSPMaster implements JobSub
JobInProgress job = new JobInProgress(jobID, new Path(jobFile), this,
this.conf);
++totalSubmissions;
- if(LOG.isDebugEnabled()){
- LOG.debug("Submitting job number = " + totalSubmissions +
- " id = " + job.getJobID());
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Submitting job number = " + totalSubmissions + " id = "
+ + job.getJobID());
}
return addJob(jobID, job);
}
@@ -755,10 +758,10 @@ public class BSPMaster implements JobSub
}
return job.getStatus();
}
-
+
/**
- * Recovers task in job. To be called when a particular task in a job has failed
- * and there is a need to schedule it on a machine.
+ * Recovers task in job. To be called when a particular task in a job has
+ * failed and there is a need to schedule it on a machine.
*/
private synchronized void recoverTask(JobInProgress job) {
++totalSubmissions;
@@ -893,7 +896,7 @@ public class BSPMaster implements JobSub
try {
this.syncClient.close();
} catch (IOException e) {
- LOG.error("Error closing the sync client",e);
+ LOG.error("Error closing the sync client", e);
}
if (null != this.supervisor.get()) {
this.supervisor.get().stop();
Modified: hama/trunk/core/src/main/java/org/apache/hama/bsp/BSPPeerImpl.java
URL: http://svn.apache.org/viewvc/hama/trunk/core/src/main/java/org/apache/hama/bsp/BSPPeerImpl.java?rev=1448523&r1=1448522&r2=1448523&view=diff
==============================================================================
--- hama/trunk/core/src/main/java/org/apache/hama/bsp/BSPPeerImpl.java (original)
+++ hama/trunk/core/src/main/java/org/apache/hama/bsp/BSPPeerImpl.java Thu Feb 21 06:38:33 2013
@@ -58,7 +58,10 @@ public final class BSPPeerImpl<K1, V1, K
private static final Log LOG = LogFactory.getLog(BSPPeerImpl.class);
public static enum PeerCounter {
- COMPRESSED_MESSAGES, SUPERSTEP_SUM, TASK_INPUT_RECORDS, TASK_OUTPUT_RECORDS, IO_BYTES_READ, MESSAGE_BYTES_TRANSFERED, MESSAGE_BYTES_RECEIVED, TOTAL_MESSAGES_SENT, TOTAL_MESSAGES_RECEIVED, COMPRESSED_BYTES_SENT, COMPRESSED_BYTES_RECEIVED, TIME_IN_SYNC_MS
+ COMPRESSED_MESSAGES, SUPERSTEP_SUM, TASK_INPUT_RECORDS,
+ TASK_OUTPUT_RECORDS, IO_BYTES_READ, MESSAGE_BYTES_TRANSFERED,
+ MESSAGE_BYTES_RECEIVED, TOTAL_MESSAGES_SENT, TOTAL_MESSAGES_RECEIVED,
+ COMPRESSED_BYTES_SENT, COMPRESSED_BYTES_RECEIVED, TIME_IN_SYNC_MS
}
private final Configuration conf;
Modified: hama/trunk/core/src/main/java/org/apache/hama/bsp/BSPTask.java
URL: http://svn.apache.org/viewvc/hama/trunk/core/src/main/java/org/apache/hama/bsp/BSPTask.java?rev=1448523&r1=1448522&r2=1448523&view=diff
==============================================================================
--- hama/trunk/core/src/main/java/org/apache/hama/bsp/BSPTask.java (original)
+++ hama/trunk/core/src/main/java/org/apache/hama/bsp/BSPTask.java Thu Feb 21 06:38:33 2013
@@ -69,7 +69,8 @@ public final class BSPTask extends Task
boolean shouldKillSelf = false;
try {
if (LOG.isDebugEnabled())
- LOG.debug("Pinging at time " + Calendar.getInstance().getTimeInMillis());
+ LOG.debug("Pinging at time "
+ + Calendar.getInstance().getTimeInMillis());
// if the RPC call returns false, it means that groomserver does not
// have knowledge of this task.
shouldKillSelf = !(pingRPC.ping(taskId) && bspThread.isAlive());
@@ -114,8 +115,8 @@ public final class BSPTask extends Task
private void startPingingGroom(BSPJob job, BSPPeerProtocol umbilical) {
- long pingPeriod = job.getConfiguration().getLong(Constants.GROOM_PING_PERIOD,
- Constants.DEFAULT_GROOM_PING_PERIOD) / 2;
+ long pingPeriod = job.getConfiguration().getLong(
+ Constants.GROOM_PING_PERIOD, Constants.DEFAULT_GROOM_PING_PERIOD) / 2;
try {
if (pingPeriod > 0) {
@@ -156,7 +157,8 @@ public final class BSPTask extends Task
throws Exception {
BSP<KEYIN, VALUEIN, KEYOUT, VALUEOUT, M> bsp = (BSP<KEYIN, VALUEIN, KEYOUT, VALUEOUT, M>) ReflectionUtils
- .newInstance(job.getConfiguration().getClass("bsp.work.class", BSP.class),
+ .newInstance(
+ job.getConfiguration().getClass("bsp.work.class", BSP.class),
job.getConfiguration());
// The policy is to throw the first exception and log the remaining.
Modified: hama/trunk/core/src/main/java/org/apache/hama/bsp/CombineFileInputFormat.java
URL: http://svn.apache.org/viewvc/hama/trunk/core/src/main/java/org/apache/hama/bsp/CombineFileInputFormat.java?rev=1448523&r1=1448522&r2=1448523&view=diff
==============================================================================
--- hama/trunk/core/src/main/java/org/apache/hama/bsp/CombineFileInputFormat.java (original)
+++ hama/trunk/core/src/main/java/org/apache/hama/bsp/CombineFileInputFormat.java Thu Feb 21 06:38:33 2013
@@ -23,7 +23,6 @@ import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
-import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Set;
@@ -35,6 +34,7 @@ import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.FileUtil;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.PathFilter;
+import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.mapred.Reporter;
import org.apache.hadoop.mapreduce.TaskAttemptContext;
import org.apache.hadoop.net.NetworkTopology;
@@ -210,12 +210,12 @@ public abstract class CombineFileInputFo
// Finally, process all paths that do not belong to any pool.
ArrayList<Path> myPaths = new ArrayList<Path>();
- for (Path path : paths) {
- if (path == null) { // already processed
- continue;
- }
- myPaths.add(path);
+ for (Path path : paths) {
+ if (path == null) { // already processed
+ continue;
}
+ myPaths.add(path);
+ }
// create splits for all files that are not in any pool.
getMoreSplits(bspJob, myPaths.toArray(new Path[myPaths.size()]), maxSize,
minSizeNode, minSizeRack, splits);
@@ -261,48 +261,47 @@ public abstract class CombineFileInputFo
// process all nodes and create splits that are local
// to a node.
- for (Map.Entry<String, List<OneBlockInfo>> one : nodeToBlocks
- .entrySet()) {
+ for (Map.Entry<String, List<OneBlockInfo>> one : nodeToBlocks.entrySet()) {
- nodes.add(one.getKey());
- List<OneBlockInfo> blocksInNode = one.getValue();
+ nodes.add(one.getKey());
+ List<OneBlockInfo> blocksInNode = one.getValue();
- // for each block, copy it into validBlocks. Delete it from
- // blockToNodes so that the same block does not appear in
- // two different splits.
- for (OneBlockInfo oneblock : blocksInNode) {
- if (blockToNodes.containsKey(oneblock)) {
- validBlocks.add(oneblock);
- blockToNodes.remove(oneblock);
- curSplitSize += oneblock.length;
-
- // if the accumulated split size exceeds the maximum, then
- // create this split.
- if (maxSize != 0 && curSplitSize >= maxSize) {
- // create an input split and add it to the splits array
- addCreatedSplit(job, splits, nodes, validBlocks);
- curSplitSize = 0;
- validBlocks.clear();
- }
- }
- }
- // if there were any blocks left over and their combined size is
- // larger than minSplitNode, then combine them into one split.
- // Otherwise add them back to the unprocessed pool. It is likely
- // that they will be combined with other blocks from the same rack later
- // on.
- if (minSizeNode != 0 && curSplitSize >= minSizeNode) {
- // create an input split and add it to the splits array
- addCreatedSplit(job, splits, nodes, validBlocks);
- } else {
- for (OneBlockInfo oneblock : validBlocks) {
- blockToNodes.put(oneblock, oneblock.hosts);
- }
+ // for each block, copy it into validBlocks. Delete it from
+ // blockToNodes so that the same block does not appear in
+ // two different splits.
+ for (OneBlockInfo oneblock : blocksInNode) {
+ if (blockToNodes.containsKey(oneblock)) {
+ validBlocks.add(oneblock);
+ blockToNodes.remove(oneblock);
+ curSplitSize += oneblock.length;
+
+ // if the accumulated split size exceeds the maximum, then
+ // create this split.
+ if (maxSize != 0 && curSplitSize >= maxSize) {
+ // create an input split and add it to the splits array
+ addCreatedSplit(job, splits, nodes, validBlocks);
+ curSplitSize = 0;
+ validBlocks.clear();
}
- validBlocks.clear();
- nodes.clear();
- curSplitSize = 0;
+ }
+ }
+ // if there were any blocks left over and their combined size is
+ // larger than minSplitNode, then combine them into one split.
+ // Otherwise add them back to the unprocessed pool. It is likely
+ // that they will be combined with other blocks from the same rack later
+ // on.
+ if (minSizeNode != 0 && curSplitSize >= minSizeNode) {
+ // create an input split and add it to the splits array
+ addCreatedSplit(job, splits, nodes, validBlocks);
+ } else {
+ for (OneBlockInfo oneblock : validBlocks) {
+ blockToNodes.put(oneblock, oneblock.hosts);
+ }
}
+ validBlocks.clear();
+ nodes.clear();
+ curSplitSize = 0;
+ }
// if blocks in a rack are below the specified minimum size, then keep them
// in 'overflow'. After the processing of all racks is complete, these
@@ -322,57 +321,56 @@ public abstract class CombineFileInputFo
// split size).
// iterate over all racks
- for (Map.Entry<String, List<OneBlockInfo>> one : rackToBlocks
- .entrySet()) {
+ for (Map.Entry<String, List<OneBlockInfo>> one : rackToBlocks.entrySet()) {
- racks.add(one.getKey());
- List<OneBlockInfo> blocks = one.getValue();
+ racks.add(one.getKey());
+ List<OneBlockInfo> blocks = one.getValue();
- // for each block, copy it into validBlocks. Delete it from
- // blockToNodes so that the same block does not appear in
- // two different splits.
- boolean createdSplit = false;
- for (OneBlockInfo oneblock : blocks) {
- if (blockToNodes.containsKey(oneblock)) {
- validBlocks.add(oneblock);
- blockToNodes.remove(oneblock);
- curSplitSize += oneblock.length;
-
- // if the accumulated split size exceeds the maximum, then
- // create this split.
- if (maxSize != 0 && curSplitSize >= maxSize) {
- // create an input split and add it to the splits array
- addCreatedSplit(job, splits, getHosts(racks), validBlocks);
- createdSplit = true;
- break;
- }
- }
+ // for each block, copy it into validBlocks. Delete it from
+ // blockToNodes so that the same block does not appear in
+ // two different splits.
+ boolean createdSplit = false;
+ for (OneBlockInfo oneblock : blocks) {
+ if (blockToNodes.containsKey(oneblock)) {
+ validBlocks.add(oneblock);
+ blockToNodes.remove(oneblock);
+ curSplitSize += oneblock.length;
+
+ // if the accumulated split size exceeds the maximum, then
+ // create this split.
+ if (maxSize != 0 && curSplitSize >= maxSize) {
+ // create an input split and add it to the splits array
+ addCreatedSplit(job, splits, getHosts(racks), validBlocks);
+ createdSplit = true;
+ break;
}
+ }
+ }
- // if we created a split, then just go to the next rack
- if (createdSplit) {
- curSplitSize = 0;
- validBlocks.clear();
- racks.clear();
- continue;
- }
+ // if we created a split, then just go to the next rack
+ if (createdSplit) {
+ curSplitSize = 0;
+ validBlocks.clear();
+ racks.clear();
+ continue;
+ }
- if (!validBlocks.isEmpty()) {
- if (minSizeRack != 0 && curSplitSize >= minSizeRack) {
- // if there is a mimimum size specified, then create a single split
- // otherwise, store these blocks into overflow data structure
- addCreatedSplit(job, splits, getHosts(racks), validBlocks);
- } else {
- // There were a few blocks in this rack that remained to be
- // processed.
- // Keep them in 'overflow' block list. These will be combined later.
- overflowBlocks.addAll(validBlocks);
- }
- }
- curSplitSize = 0;
- validBlocks.clear();
- racks.clear();
+ if (!validBlocks.isEmpty()) {
+ if (minSizeRack != 0 && curSplitSize >= minSizeRack) {
+ // if there is a mimimum size specified, then create a single split
+ // otherwise, store these blocks into overflow data structure
+ addCreatedSplit(job, splits, getHosts(racks), validBlocks);
+ } else {
+ // There were a few blocks in this rack that remained to be
+ // processed.
+ // Keep them in 'overflow' block list. These will be combined later.
+ overflowBlocks.addAll(validBlocks);
+ }
}
+ curSplitSize = 0;
+ validBlocks.clear();
+ racks.clear();
+ }
}
assert blockToNodes.isEmpty();
@@ -387,7 +385,7 @@ public abstract class CombineFileInputFo
// This might cause an exiting rack location to be re-added,
// but it should be ok.
- Collections.addAll(racks, oneblock.racks);
+ Collections.addAll(racks, oneblock.racks);
// if the accumulated split size exceeds the maximum, then
// create this split.
@@ -425,7 +423,7 @@ public abstract class CombineFileInputFo
// add this split to the list that is returned
CombineFileSplit thissplit = new CombineFileSplit(job, fl, offset, length,
- locations.toArray(new String[locations.size()]));
+ locations.toArray(new String[locations.size()]));
splitList.add(thissplit);
}
Modified: hama/trunk/core/src/main/java/org/apache/hama/bsp/CombineFileSplit.java
URL: http://svn.apache.org/viewvc/hama/trunk/core/src/main/java/org/apache/hama/bsp/CombineFileSplit.java?rev=1448523&r1=1448522&r2=1448523&view=diff
==============================================================================
--- hama/trunk/core/src/main/java/org/apache/hama/bsp/CombineFileSplit.java (original)
+++ hama/trunk/core/src/main/java/org/apache/hama/bsp/CombineFileSplit.java Thu Feb 21 06:38:33 2013
@@ -179,7 +179,8 @@ public class CombineFileSplit implements
if (i == 0) {
sb.append("Paths:");
}
- sb.append(paths[i].toUri().getPath()).append(":").append(startoffset[i]).append("+").append(lengths[i]);
+ sb.append(paths[i].toUri().getPath()).append(":").append(startoffset[i])
+ .append("+").append(lengths[i]);
if (i < paths.length - 1) {
sb.append(",");
}
@@ -187,9 +188,9 @@ public class CombineFileSplit implements
if (locations != null) {
String locs = "";
StringBuffer locsb = new StringBuffer();
- for (String location : locations) {
- locsb.append(location).append(":");
- }
+ for (String location : locations) {
+ locsb.append(location).append(":");
+ }
locs = locsb.toString();
sb.append(" Locations:").append(locs).append("; ");
}
Modified: hama/trunk/core/src/main/java/org/apache/hama/bsp/Counters.java
URL: http://svn.apache.org/viewvc/hama/trunk/core/src/main/java/org/apache/hama/bsp/Counters.java?rev=1448523&r1=1448522&r2=1448523&view=diff
==============================================================================
--- hama/trunk/core/src/main/java/org/apache/hama/bsp/Counters.java (original)
+++ hama/trunk/core/src/main/java/org/apache/hama/bsp/Counters.java Thu Feb 21 06:38:33 2013
@@ -614,7 +614,8 @@ public class Counters implements Writabl
for (Group group : this) {
sb.append("\n\t").append(group.getDisplayName());
for (Counter counter : group) {
- sb.append("\n\t\t").append(counter.getDisplayName()).append("=").append(counter.getCounter());
+ sb.append("\n\t\t").append(counter.getDisplayName()).append("=")
+ .append(counter.getCounter());
}
}
return sb.toString();
Modified: hama/trunk/core/src/main/java/org/apache/hama/bsp/Directive.java
URL: http://svn.apache.org/viewvc/hama/trunk/core/src/main/java/org/apache/hama/bsp/Directive.java?rev=1448523&r1=1448522&r2=1448523&view=diff
==============================================================================
--- hama/trunk/core/src/main/java/org/apache/hama/bsp/Directive.java (original)
+++ hama/trunk/core/src/main/java/org/apache/hama/bsp/Directive.java Thu Feb 21 06:38:33 2013
@@ -45,7 +45,7 @@ public class Directive implements Writab
}
}
- public Directive() {
+ public Directive() {
}
public Directive(Directive.Type type) {
Modified: hama/trunk/core/src/main/java/org/apache/hama/bsp/FileInputFormat.java
URL: http://svn.apache.org/viewvc/hama/trunk/core/src/main/java/org/apache/hama/bsp/FileInputFormat.java?rev=1448523&r1=1448522&r2=1448523&view=diff
==============================================================================
--- hama/trunk/core/src/main/java/org/apache/hama/bsp/FileInputFormat.java (original)
+++ hama/trunk/core/src/main/java/org/apache/hama/bsp/FileInputFormat.java Thu Feb 21 06:38:33 2013
@@ -143,8 +143,8 @@ public abstract class FileInputFormat<K,
} else {
for (FileStatus globStat : matches) {
if (globStat.isDir()) {
- Collections.addAll(result, fs.listStatus(globStat.getPath(),
- inputFilter));
+ Collections.addAll(result,
+ fs.listStatus(globStat.getPath(), inputFilter));
} else {
result.add(globStat);
}
Modified: hama/trunk/core/src/main/java/org/apache/hama/bsp/FileOutputFormat.java
URL: http://svn.apache.org/viewvc/hama/trunk/core/src/main/java/org/apache/hama/bsp/FileOutputFormat.java?rev=1448523&r1=1448522&r2=1448523&view=diff
==============================================================================
--- hama/trunk/core/src/main/java/org/apache/hama/bsp/FileOutputFormat.java (original)
+++ hama/trunk/core/src/main/java/org/apache/hama/bsp/FileOutputFormat.java Thu Feb 21 06:38:33 2013
@@ -60,8 +60,8 @@ public abstract class FileOutputFormat<K
public static void setOutputCompressorClass(BSPJob conf,
Class<? extends CompressionCodec> codecClass) {
setCompressOutput(conf, true);
- conf.getConfiguration().setClass("bsp.output.compression.codec", codecClass,
- CompressionCodec.class);
+ conf.getConfiguration().setClass("bsp.output.compression.codec",
+ codecClass, CompressionCodec.class);
}
/**
Modified: hama/trunk/core/src/main/java/org/apache/hama/bsp/GroomServer.java
URL: http://svn.apache.org/viewvc/hama/trunk/core/src/main/java/org/apache/hama/bsp/GroomServer.java?rev=1448523&r1=1448522&r2=1448523&view=diff
==============================================================================
--- hama/trunk/core/src/main/java/org/apache/hama/bsp/GroomServer.java (original)
+++ hama/trunk/core/src/main/java/org/apache/hama/bsp/GroomServer.java Thu Feb 21 06:38:33 2013
@@ -33,7 +33,6 @@ import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.TreeMap;
-import java.util.concurrent.atomic.AtomicReference;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
@@ -41,6 +40,7 @@ import java.util.concurrent.Executors;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicReference;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
@@ -98,7 +98,7 @@ public class GroomServer implements Runn
NORMAL, COMPUTE, SYNC, BARRIER, STALE, INTERRUPTED, DENIED
}
- private HttpServer server;
+ private HttpServer server;
private ZooKeeper zk = null;
// Running States and its related things
@@ -276,17 +276,17 @@ public class GroomServer implements Runn
LOG.debug("Got " + outOfContactTasks.size() + " oblivious tasks");
}
- for (TaskInProgress tip : outOfContactTasks) {
- try {
- LOG.debug("Purging task " + tip);
- purgeTask(tip, true);
- } catch (Exception e) {
- LOG.error(
- new StringBuilder("Error while removing a timed-out task - ")
- .append(tip.toString()), e);
+ for (TaskInProgress tip : outOfContactTasks) {
+ try {
+ LOG.debug("Purging task " + tip);
+ purgeTask(tip, true);
+ } catch (Exception e) {
+ LOG.error(
+ new StringBuilder("Error while removing a timed-out task - ")
+ .append(tip.toString()), e);
- }
}
+ }
outOfContactTasks.clear();
}
@@ -464,15 +464,15 @@ public class GroomServer implements Runn
LOG.debug(localDirs);
if (localDirs != null) {
- for (String localDir : localDirs) {
- try {
- LOG.info(localDir);
- DiskChecker.checkDir(new File(localDir));
- writable = true;
- } catch (DiskErrorException e) {
- LOG.warn("BSP Processor local " + e.getMessage());
- }
+ for (String localDir : localDirs) {
+ try {
+ LOG.info(localDir);
+ DiskChecker.checkDir(new File(localDir));
+ writable = true;
+ } catch (DiskErrorException e) {
+ LOG.warn("BSP Processor local " + e.getMessage());
}
+ }
}
if (!writable)
@@ -485,18 +485,17 @@ public class GroomServer implements Runn
public void deleteLocalFiles() throws IOException {
String[] localDirs = getLocalDirs();
- for (String localDir : localDirs) {
- FileSystem.getLocal(this.conf).delete(new Path(localDir), true);
- }
+ for (String localDir : localDirs) {
+ FileSystem.getLocal(this.conf).delete(new Path(localDir), true);
+ }
}
public void deleteLocalFiles(String subdir) throws IOException {
try {
String[] localDirs = getLocalDirs();
- for (String localDir : localDirs) {
- FileSystem.getLocal(this.conf).delete(new Path(localDir, subdir),
- true);
- }
+ for (String localDir : localDirs) {
+ FileSystem.getLocal(this.conf).delete(new Path(localDir, subdir), true);
+ }
} catch (NullPointerException e) {
LOG.info(e);
}
@@ -808,10 +807,10 @@ public class GroomServer implements Runn
// Task is out of contact if it has not pinged since more than
// monitorPeriod. A task is given a leeway of 10 times monitorPeriod
// to get started.
-
+
// TODO Please refactor this conditions
- // NOTE: (currentTime - tip.lastPingedTimestamp) > 6 * monitorPeriod
-
+ // NOTE: (currentTime - tip.lastPingedTimestamp) > 6 * monitorPeriod
+
if (tip.taskStatus.getRunState().equals(TaskStatus.State.RUNNING)
&& (((tip.lastPingedTimestamp == 0 && ((currentTime - tip.startTime) > 10 * monitorPeriod)) || ((tip.lastPingedTimestamp > 0) && (currentTime - tip.lastPingedTimestamp) > 6 * monitorPeriod)))) {
Modified: hama/trunk/core/src/main/java/org/apache/hama/bsp/GroomServerAction.java
URL: http://svn.apache.org/viewvc/hama/trunk/core/src/main/java/org/apache/hama/bsp/GroomServerAction.java?rev=1448523&r1=1448522&r2=1448523&view=diff
==============================================================================
--- hama/trunk/core/src/main/java/org/apache/hama/bsp/GroomServerAction.java (original)
+++ hama/trunk/core/src/main/java/org/apache/hama/bsp/GroomServerAction.java Thu Feb 21 06:38:33 2013
@@ -58,7 +58,7 @@ public abstract class GroomServerAction
UPDATE_PEER
}
- /**
+ /**
* A factory-method to create objects of given {@link ActionType}.
*
* @param actionType the {@link ActionType} of object to create.
@@ -79,17 +79,15 @@ public abstract class GroomServerAction
case KILL_JOB: {
action = new KillJobAction();
}
- break;
- case RECOVER_TASK:
- {
+ break;
+ case RECOVER_TASK: {
action = new RecoverTaskAction();
}
- break;
- case UPDATE_PEER:
- {
+ break;
+ case UPDATE_PEER: {
action = new UpdatePeerAction();
}
- break;
+ break;
case REINIT_GROOM: {
action = new ReinitGroomAction();
}
Modified: hama/trunk/core/src/main/java/org/apache/hama/bsp/GroomServerStatus.java
URL: http://svn.apache.org/viewvc/hama/trunk/core/src/main/java/org/apache/hama/bsp/GroomServerStatus.java?rev=1448523&r1=1448522&r2=1448523&view=diff
==============================================================================
--- hama/trunk/core/src/main/java/org/apache/hama/bsp/GroomServerStatus.java (original)
+++ hama/trunk/core/src/main/java/org/apache/hama/bsp/GroomServerStatus.java Thu Feb 21 06:38:33 2013
@@ -123,13 +123,13 @@ public class GroomServerStatus implement
*/
public int countTasks() {
int taskCount = 0;
- for (TaskStatus ts : taskReports) {
- TaskStatus.State state = ts.getRunState();
- if (state == TaskStatus.State.RUNNING
- || state == TaskStatus.State.UNASSIGNED) {
- taskCount++;
- }
+ for (TaskStatus ts : taskReports) {
+ TaskStatus.State state = ts.getRunState();
+ if (state == TaskStatus.State.RUNNING
+ || state == TaskStatus.State.UNASSIGNED) {
+ taskCount++;
}
+ }
return taskCount;
}
Modified: hama/trunk/core/src/main/java/org/apache/hama/bsp/JobInProgress.java
URL: http://svn.apache.org/viewvc/hama/trunk/core/src/main/java/org/apache/hama/bsp/JobInProgress.java?rev=1448523&r1=1448522&r2=1448523&view=diff
==============================================================================
--- hama/trunk/core/src/main/java/org/apache/hama/bsp/JobInProgress.java (original)
+++ hama/trunk/core/src/main/java/org/apache/hama/bsp/JobInProgress.java Thu Feb 21 06:38:33 2013
@@ -120,13 +120,14 @@ public class JobInProgress {
private TaskAllocationStrategy taskAllocationStrategy;
private FaultTolerantMasterService faultToleranceService;
-
+
/**
* Used only for unit tests.
+ *
* @param jobId
* @param conf
*/
- public JobInProgress(BSPJobID jobId, Configuration conf){
+ public JobInProgress(BSPJobID jobId, Configuration conf) {
this.conf = conf;
this.jobId = jobId;
master = null;
@@ -161,8 +162,8 @@ public class JobInProgress {
this.taskCompletionEvents = new ArrayList<TaskCompletionEvent>(
numBSPTasks + 10);
- this.maxTaskAttempts = job.getConfiguration().getInt(Constants.MAX_TASK_ATTEMPTS,
- Constants.DEFAULT_MAX_TASK_ATTEMPTS);
+ this.maxTaskAttempts = job.getConfiguration().getInt(
+ Constants.MAX_TASK_ATTEMPTS, Constants.DEFAULT_MAX_TASK_ATTEMPTS);
this.profile = new JobProfile(job.getUser(), jobId, jobFile.toString(),
job.getJobName());
@@ -300,9 +301,8 @@ public class JobInProgress {
if (conf.getBoolean(Constants.FAULT_TOLERANCE_FLAG, false)) {
- Class<?> ftClass = conf.getClass(Constants.FAULT_TOLERANCE_CLASS,
- AsyncRcvdMsgCheckpointImpl.class ,
- BSPFaultTolerantService.class);
+ Class<?> ftClass = conf.getClass(Constants.FAULT_TOLERANCE_CLASS,
+ AsyncRcvdMsgCheckpointImpl.class, BSPFaultTolerantService.class);
if (ftClass != null) {
try {
faultToleranceService = ((BSPFaultTolerantService<?>) ReflectionUtils
@@ -340,25 +340,25 @@ public class JobInProgress {
Task result = null;
BSPResource[] resources = new BSPResource[0];
- for (TaskInProgress task : tasks) {
- if (!task.isRunning() && !task.isComplete()) {
+ for (TaskInProgress task : tasks) {
+ if (!task.isRunning() && !task.isComplete()) {
- String[] selectedGrooms = taskAllocationStrategy.selectGrooms(
- groomStatuses, taskCountInGroomMap, resources, task);
- GroomServerStatus groomStatus = taskAllocationStrategy
- .getGroomToAllocate(groomStatuses, selectedGrooms,
- taskCountInGroomMap, resources, task);
- if (groomStatus != null) {
- result = task.constructTask(groomStatus);
- } else if (LOG.isDebugEnabled()) {
- LOG.debug("Could not find a groom to schedule task");
- }
- if (result != null) {
- updateGroomTaskDetails(task.getGroomServerStatus(), result);
- }
- break;
- }
+ String[] selectedGrooms = taskAllocationStrategy.selectGrooms(
+ groomStatuses, taskCountInGroomMap, resources, task);
+ GroomServerStatus groomStatus = taskAllocationStrategy
+ .getGroomToAllocate(groomStatuses, selectedGrooms,
+ taskCountInGroomMap, resources, task);
+ if (groomStatus != null) {
+ result = task.constructTask(groomStatus);
+ } else if (LOG.isDebugEnabled()) {
+ LOG.debug("Could not find a groom to schedule task");
+ }
+ if (result != null) {
+ updateGroomTaskDetails(task.getGroomServerStatus(), result);
+ }
+ break;
}
+ }
counters.incrCounter(JobCounter.LAUNCHED_TASKS, 1L);
return result;
@@ -542,9 +542,9 @@ public class JobInProgress {
//
// kill all TIPs.
//
- for (TaskInProgress task : tasks) {
- task.kill();
- }
+ for (TaskInProgress task : tasks) {
+ task.kill();
+ }
garbageCollect();
}
@@ -557,12 +557,12 @@ public class JobInProgress {
*/
synchronized void garbageCollect() {
try {
-
- if(LOG.isDebugEnabled()){
+
+ if (LOG.isDebugEnabled()) {
LOG.debug("Removing " + localJobFile + " and " + localJarFile
+ " getJobFile = " + profile.getJobFile());
}
-
+
// Definitely remove the local-disk copy of the job file
if (localJobFile != null) {
localFs.delete(localJobFile, true);
@@ -644,21 +644,19 @@ public class JobInProgress {
return false;
if (!faultToleranceService.isAlreadyRecovered(tip)) {
- if(LOG.isDebugEnabled()){
+ if (LOG.isDebugEnabled()) {
LOG.debug("Adding recovery task " + tip.getCurrentTaskAttemptId());
}
recoveryTasks.add(tip);
status.setRunState(JobStatus.RECOVERING);
return true;
- }
- else if(LOG.isDebugEnabled()){
+ } else if (LOG.isDebugEnabled()) {
LOG.debug("Avoiding recovery task " + tip.getCurrentTaskAttemptId());
}
return false;
-
+
}
-
-
+
/**
*
* @return Returns the list of tasks in progress that has to be recovered.
Modified: hama/trunk/core/src/main/java/org/apache/hama/bsp/JobInProgressListener.java
URL: http://svn.apache.org/viewvc/hama/trunk/core/src/main/java/org/apache/hama/bsp/JobInProgressListener.java?rev=1448523&r1=1448522&r2=1448523&view=diff
==============================================================================
--- hama/trunk/core/src/main/java/org/apache/hama/bsp/JobInProgressListener.java (original)
+++ hama/trunk/core/src/main/java/org/apache/hama/bsp/JobInProgressListener.java Thu Feb 21 06:38:33 2013
@@ -40,9 +40,10 @@ abstract class JobInProgressListener {
* @throws IOException
*/
public abstract void jobRemoved(JobInProgress job) throws IOException;
-
+
/**
* Invoked when a task in job has to be recovered by {@link BSPMaster}.
+ *
* @param job The job to which the task belongs to.
* @param task that has to be recovered
* @throws IOException
Modified: hama/trunk/core/src/main/java/org/apache/hama/bsp/PartitioningRunner.java
URL: http://svn.apache.org/viewvc/hama/trunk/core/src/main/java/org/apache/hama/bsp/PartitioningRunner.java?rev=1448523&r1=1448522&r2=1448523&view=diff
==============================================================================
--- hama/trunk/core/src/main/java/org/apache/hama/bsp/PartitioningRunner.java (original)
+++ hama/trunk/core/src/main/java/org/apache/hama/bsp/PartitioningRunner.java Thu Feb 21 06:38:33 2013
@@ -65,6 +65,7 @@ public class PartitioningRunner extends
converter = ReflectionUtils.newInstance(conf.getClass(
Constants.RUNTIME_PARTITION_RECORDCONVERTER,
DefaultRecordConverter.class, RecordConverter.class), conf);
+ converter.setup(conf);
if (conf.get(Constants.RUNTIME_PARTITIONING_DIR) == null) {
this.partitionDir = new Path(inputDir + "/partitions");
@@ -82,6 +83,8 @@ public class PartitioningRunner extends
*/
public static interface RecordConverter {
+ public void setup(Configuration conf);
+
/**
* Should return the Key-Value pair constructed from the input format.
*
@@ -94,9 +97,10 @@ public class PartitioningRunner extends
KeyValuePair<Writable, Writable> inputRecord, Configuration conf);
public int getPartitionId(KeyValuePair<Writable, Writable> inputRecord,
- @SuppressWarnings("rawtypes") Partitioner partitioner,
- Configuration conf, @SuppressWarnings("rawtypes") BSPPeer peer,
- int numTasks);
+ @SuppressWarnings("rawtypes")
+ Partitioner partitioner, Configuration conf,
+ @SuppressWarnings("rawtypes")
+ BSPPeer peer, int numTasks);
}
/**
@@ -113,12 +117,18 @@ public class PartitioningRunner extends
@SuppressWarnings("unchecked")
@Override
public int getPartitionId(KeyValuePair<Writable, Writable> outputRecord,
- @SuppressWarnings("rawtypes") Partitioner partitioner,
- Configuration conf, @SuppressWarnings("rawtypes") BSPPeer peer,
- int numTasks) {
+ @SuppressWarnings("rawtypes")
+ Partitioner partitioner, Configuration conf,
+ @SuppressWarnings("rawtypes")
+ BSPPeer peer, int numTasks) {
return Math.abs(partitioner.getPartition(outputRecord.getKey(),
outputRecord.getValue(), numTasks));
}
+
+ @Override
+ public void setup(Configuration conf) {
+
+ }
}
@Override
@@ -159,7 +169,7 @@ public class PartitioningRunner extends
}
values.get(index).put(outputPair.getKey(), outputPair.getValue());
}
-
+
// The reason of use of Memory is to reduce file opens
for (Map.Entry<Integer, Map<Writable, Writable>> e : values.entrySet()) {
Path destFile = new Path(partitionDir + "/part-" + e.getKey() + "/file-"
@@ -177,55 +187,55 @@ public class PartitioningRunner extends
FileStatus[] status = fs.listStatus(partitionDir);
// Call sync() one more time to avoid concurrent access
peer.sync();
-
+
// merge files into one.
// TODO if we use header info, we might able to merge files without full
// scan.
- for (FileStatus statu : status) {
- int partitionID = Integer.parseInt(statu.getPath().getName()
- .split("[-]")[1]);
- int denom = desiredNum / peer.getNumPeers();
- int assignedID = partitionID;
- if (denom > 1) {
- assignedID = partitionID / denom;
- }
-
- if (assignedID == peer.getNumPeers())
- assignedID = assignedID - 1;
+ for (FileStatus statu : status) {
+ int partitionID = Integer
+ .parseInt(statu.getPath().getName().split("[-]")[1]);
+ int denom = desiredNum / peer.getNumPeers();
+ int assignedID = partitionID;
+ if (denom > 1) {
+ assignedID = partitionID / denom;
+ }
+
+ if (assignedID == peer.getNumPeers())
+ assignedID = assignedID - 1;
+
+ // TODO set replica factor to 1.
+ // TODO and check whether we can write to specific DataNode.
+ if (assignedID == peer.getPeerIndex()) {
+ Path partitionFile = new Path(partitionDir + "/"
+ + getPartitionName(partitionID));
+
+ FileStatus[] files = fs.listStatus(statu.getPath());
+ SequenceFile.Writer writer = SequenceFile.createWriter(fs, conf,
+ partitionFile, outputKeyClass, outputValueClass,
+ CompressionType.NONE);
+
+ for (int i = 0; i < files.length; i++) {
+ LOG.debug("merge '" + files[i].getPath() + "' into " + partitionDir
+ + "/" + getPartitionName(partitionID));
+
+ SequenceFile.Reader reader = new SequenceFile.Reader(fs,
+ files[i].getPath(), conf);
+
+ Writable key = (Writable) ReflectionUtils.newInstance(outputKeyClass,
+ conf);
+ Writable value = (Writable) ReflectionUtils.newInstance(
+ outputValueClass, conf);
- // TODO set replica factor to 1.
- // TODO and check whether we can write to specific DataNode.
- if (assignedID == peer.getPeerIndex()) {
- Path partitionFile = new Path(partitionDir + "/"
- + getPartitionName(partitionID));
-
- FileStatus[] files = fs.listStatus(statu.getPath());
- SequenceFile.Writer writer = SequenceFile.createWriter(fs, conf,
- partitionFile, outputKeyClass, outputValueClass,
- CompressionType.NONE);
-
- for (int i = 0; i < files.length; i++) {
- LOG.debug("merge '" + files[i].getPath() + "' into " + partitionDir
- + "/" + getPartitionName(partitionID));
-
- SequenceFile.Reader reader = new SequenceFile.Reader(fs,
- files[i].getPath(), conf);
-
- Writable key = (Writable) ReflectionUtils.newInstance(outputKeyClass,
- conf);
- Writable value = (Writable) ReflectionUtils.newInstance(
- outputValueClass, conf);
-
- while (reader.next(key, value)) {
- writer.append(key, value);
- }
- reader.close();
- }
-
- writer.close();
- fs.delete(statu.getPath(), true);
+ while (reader.next(key, value)) {
+ writer.append(key, value);
}
+ reader.close();
+ }
+
+ writer.close();
+ fs.delete(statu.getPath(), true);
}
+ }
}
@SuppressWarnings("rawtypes")
Modified: hama/trunk/core/src/main/java/org/apache/hama/bsp/QueueManager.java
URL: http://svn.apache.org/viewvc/hama/trunk/core/src/main/java/org/apache/hama/bsp/QueueManager.java?rev=1448523&r1=1448522&r2=1448523&view=diff
==============================================================================
--- hama/trunk/core/src/main/java/org/apache/hama/bsp/QueueManager.java (original)
+++ hama/trunk/core/src/main/java/org/apache/hama/bsp/QueueManager.java Thu Feb 21 06:38:33 2013
@@ -18,9 +18,8 @@
package org.apache.hama.bsp;
import java.io.IOException;
-
-import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
import org.apache.hadoop.conf.Configuration;
Modified: hama/trunk/core/src/main/java/org/apache/hama/bsp/RecoverTaskAction.java
URL: http://svn.apache.org/viewvc/hama/trunk/core/src/main/java/org/apache/hama/bsp/RecoverTaskAction.java?rev=1448523&r1=1448522&r2=1448523&view=diff
==============================================================================
--- hama/trunk/core/src/main/java/org/apache/hama/bsp/RecoverTaskAction.java (original)
+++ hama/trunk/core/src/main/java/org/apache/hama/bsp/RecoverTaskAction.java Thu Feb 21 06:38:33 2013
@@ -45,8 +45,8 @@ public class RecoverTaskAction extends G
public Task getTask() {
return task;
}
-
- public long getSuperstepCount(){
+
+ public long getSuperstepCount() {
return superstepNumber.get();
}
@@ -54,7 +54,7 @@ public class RecoverTaskAction extends G
public void write(DataOutput out) throws IOException {
task.write(out);
superstepNumber.write(out);
-
+
}
@Override
Modified: hama/trunk/core/src/main/java/org/apache/hama/bsp/SimpleTaskScheduler.java
URL: http://svn.apache.org/viewvc/hama/trunk/core/src/main/java/org/apache/hama/bsp/SimpleTaskScheduler.java?rev=1448523&r1=1448522&r2=1448523&view=diff
==============================================================================
--- hama/trunk/core/src/main/java/org/apache/hama/bsp/SimpleTaskScheduler.java (original)
+++ hama/trunk/core/src/main/java/org/apache/hama/bsp/SimpleTaskScheduler.java Thu Feb 21 06:38:33 2013
@@ -251,16 +251,16 @@ class SimpleTaskScheduler extends TaskSc
}
// assembly into actions
- for (Task task : taskSet) {
- GroomServerStatus groomStatus = jip.getGroomStatusForTask(task);
- List<GroomServerAction> taskActions = actionMap.get(groomStatus);
- if (taskActions == null) {
- taskActions = new ArrayList<GroomServerAction>(
- groomStatus.getMaxTasks());
- }
- taskActions.add(new LaunchTaskAction(task));
- actionMap.put(groomStatus, taskActions);
+ for (Task task : taskSet) {
+ GroomServerStatus groomStatus = jip.getGroomStatusForTask(task);
+ List<GroomServerAction> taskActions = actionMap.get(groomStatus);
+ if (taskActions == null) {
+ taskActions = new ArrayList<GroomServerAction>(
+ groomStatus.getMaxTasks());
}
+ taskActions.add(new LaunchTaskAction(task));
+ actionMap.put(groomStatus, taskActions);
+ }
sendDirectivesToGrooms(actionMap);
Modified: hama/trunk/core/src/main/java/org/apache/hama/bsp/TaskCompletionEvent.java
URL: http://svn.apache.org/viewvc/hama/trunk/core/src/main/java/org/apache/hama/bsp/TaskCompletionEvent.java?rev=1448523&r1=1448522&r2=1448523&view=diff
==============================================================================
--- hama/trunk/core/src/main/java/org/apache/hama/bsp/TaskCompletionEvent.java (original)
+++ hama/trunk/core/src/main/java/org/apache/hama/bsp/TaskCompletionEvent.java Thu Feb 21 06:38:33 2013
@@ -29,7 +29,7 @@ public class TaskCompletionEvent impleme
FAILED, KILLED, SUCCEEDED, OBSOLETE, TIPFAILED
}
- private int eventId;
+ private int eventId;
private String groomServerInfo;
private int taskRunTime; // using int since runtime is the time difference
private TaskAttemptID taskId;
Modified: hama/trunk/core/src/main/java/org/apache/hama/bsp/TaskInProgress.java
URL: http://svn.apache.org/viewvc/hama/trunk/core/src/main/java/org/apache/hama/bsp/TaskInProgress.java?rev=1448523&r1=1448522&r2=1448523&view=diff
==============================================================================
--- hama/trunk/core/src/main/java/org/apache/hama/bsp/TaskInProgress.java (original)
+++ hama/trunk/core/src/main/java/org/apache/hama/bsp/TaskInProgress.java Thu Feb 21 06:38:33 2013
@@ -18,11 +18,10 @@
package org.apache.hama.bsp;
import java.io.IOException;
-import java.util.concurrent.atomic.AtomicBoolean;
-import java.util.Iterator;
import java.util.Map;
import java.util.TreeMap;
import java.util.TreeSet;
+import java.util.concurrent.atomic.AtomicBoolean;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
@@ -68,7 +67,7 @@ public class TaskInProgress {
// The first taskid of this tip
private TaskAttemptID firstTaskId;
-
+
private TaskAttemptID currentTaskId;
// Map from task Id -> GroomServer Id, contains tasks that are
@@ -145,17 +144,17 @@ public class TaskInProgress {
Map<GroomServerStatus, Integer> tasksInGroomMap,
String[] possibleLocations) {
- for (String location : possibleLocations) {
- GroomServerStatus groom = grooms.get(location);
- if (groom == null)
- continue;
- Integer taskInGroom = tasksInGroomMap.get(groom);
- taskInGroom = (taskInGroom == null) ? 0 : taskInGroom;
- if (taskInGroom < groom.getMaxTasks()
- && location.equals(groom.getGroomHostName())) {
- return groom.getGroomHostName();
- }
+ for (String location : possibleLocations) {
+ GroomServerStatus groom = grooms.get(location);
+ if (groom == null)
+ continue;
+ Integer taskInGroom = tasksInGroomMap.get(groom);
+ taskInGroom = (taskInGroom == null) ? 0 : taskInGroom;
+ if (taskInGroom < groom.getMaxTasks()
+ && location.equals(groom.getGroomHostName())) {
+ return groom.getGroomHostName();
}
+ }
return null;
}
@@ -168,16 +167,16 @@ public class TaskInProgress {
private String getAnyGroomToSchedule(Map<String, GroomServerStatus> grooms,
Map<GroomServerStatus, Integer> tasksInGroomMap) {
- for (String s : grooms.keySet()) {
- GroomServerStatus groom = grooms.get(s);
- if (groom == null)
- continue;
- Integer taskInGroom = tasksInGroomMap.get(groom);
- taskInGroom = (taskInGroom == null) ? 0 : taskInGroom;
- if (taskInGroom < groom.getMaxTasks()) {
- return groom.getGroomHostName();
- }
+ for (String s : grooms.keySet()) {
+ GroomServerStatus groom = grooms.get(s);
+ if (groom == null)
+ continue;
+ Integer taskInGroom = tasksInGroomMap.get(groom);
+ taskInGroom = (taskInGroom == null) ? 0 : taskInGroom;
+ if (taskInGroom < groom.getMaxTasks()) {
+ return groom.getGroomHostName();
}
+ }
return null;
}
@@ -188,7 +187,7 @@ public class TaskInProgress {
* @return
*/
public Task constructTask(GroomServerStatus groomStatus) {
- if(groomStatus == null){
+ if (groomStatus == null) {
return null;
}
TaskAttemptID taskId = computeTaskId();
@@ -198,8 +197,8 @@ public class TaskInProgress {
String splitClass = null;
BytesWritable split = null;
if (rawSplit != null) {
- splitClass = rawSplit.getClassName();
- split = rawSplit.getBytes();
+ splitClass = rawSplit.getClassName();
+ split = rawSplit.getBytes();
}
currentTaskId = taskId;
String groomName = groomStatus.getGroomHostName();
@@ -218,8 +217,8 @@ public class TaskInProgress {
String splitClass = null;
BytesWritable split = null;
if (rawSplit != null) {
- splitClass = rawSplit.getClassName();
- split = rawSplit.getBytes();
+ splitClass = rawSplit.getClassName();
+ split = rawSplit.getBytes();
}
Task t = null;
String groomName = getGroomToSchedule(taskid, grooms, tasksInGroomMap,
@@ -492,8 +491,8 @@ public class TaskInProgress {
public RawSplit getFileSplit() {
return this.rawSplit;
}
-
- public TaskAttemptID getCurrentTaskAttemptId(){
+
+ public TaskAttemptID getCurrentTaskAttemptId() {
return this.currentTaskId;
}
Modified: hama/trunk/core/src/main/java/org/apache/hama/bsp/TaskLog.java
URL: http://svn.apache.org/viewvc/hama/trunk/core/src/main/java/org/apache/hama/bsp/TaskLog.java?rev=1448523&r1=1448522&r2=1448523&view=diff
==============================================================================
--- hama/trunk/core/src/main/java/org/apache/hama/bsp/TaskLog.java (original)
+++ hama/trunk/core/src/main/java/org/apache/hama/bsp/TaskLog.java Thu Feb 21 06:38:33 2013
@@ -112,9 +112,9 @@ public class TaskLog {
File[] oldTaskLogs = LOG_DIR.listFiles(new TaskLogsPurgeFilter(
purgeTimeStamp));
if (oldTaskLogs != null) {
- for (File oldTaskLog : oldTaskLogs) {
- FileUtil.fullyDelete(oldTaskLog);
- }
+ for (File oldTaskLog : oldTaskLogs) {
+ FileUtil.fullyDelete(oldTaskLog);
+ }
}
}
Modified: hama/trunk/core/src/main/java/org/apache/hama/bsp/TaskRunner.java
URL: http://svn.apache.org/viewvc/hama/trunk/core/src/main/java/org/apache/hama/bsp/TaskRunner.java?rev=1448523&r1=1448522&r2=1448523&view=diff
==============================================================================
--- hama/trunk/core/src/main/java/org/apache/hama/bsp/TaskRunner.java (original)
+++ hama/trunk/core/src/main/java/org/apache/hama/bsp/TaskRunner.java Thu Feb 21 06:38:33 2013
@@ -123,7 +123,7 @@ public class TaskRunner extends Thread {
int exit_code = bspProcess.waitFor();
if (!bspKilled && exit_code != 0) {
-
+
throw new IOException("BSP task process exit with nonzero status of "
+ exit_code + ". command = " + commands);
}
@@ -181,11 +181,11 @@ public class TaskRunner extends Thread {
}
File[] libs = new File(workDir, "lib").listFiles();
if (libs != null) {
- for (File lib : libs) {
- // add libs from jar to classpath
- classPath.append(SYSTEM_PATH_SEPARATOR);
- classPath.append(lib);
- }
+ for (File lib : libs) {
+ // add libs from jar to classpath
+ classPath.append(SYSTEM_PATH_SEPARATOR);
+ classPath.append(lib);
+ }
}
classPath.append(SYSTEM_PATH_SEPARATOR);
classPath.append(new File(workDir, "classes"));
@@ -204,11 +204,12 @@ public class TaskRunner extends Thread {
vargs.add(jvm.toString());
// bsp.child.java.opts
- String javaOpts = jobConf.getConfiguration().get("bsp.child.java.opts", "-Xmx200m");
+ String javaOpts = jobConf.getConfiguration().get("bsp.child.java.opts",
+ "-Xmx200m");
javaOpts = javaOpts.replace("@taskid@", task.getTaskID().toString());
String[] javaOptsSplit = javaOpts.split(" ");
- Collections.addAll(vargs, javaOptsSplit);
+ Collections.addAll(vargs, javaOptsSplit);
// Add classpath.
vargs.add("-classpath");
@@ -225,15 +226,14 @@ public class TaskRunner extends Thread {
vargs.add(groomServer.groomHostName);
vargs.add(Long.toString(groomServer.getStartSuperstep(task.getTaskID())));
TaskStatus status = groomServer.getTaskStatus(task.getTaskID());
-
- if(status != null &&
- TaskStatus.State.RECOVERING.equals(status.getRunState())){
+
+ if (status != null
+ && TaskStatus.State.RECOVERING.equals(status.getRunState())) {
vargs.add(TaskStatus.State.RECOVERING.name());
- }
- else{
+ } else {
vargs.add(TaskStatus.State.RUNNING.name());
}
-
+
}
return vargs;
}
@@ -296,7 +296,7 @@ public class TaskRunner extends Thread {
if (bspProcess != null) {
bspProcess.destroy();
}
-
+
}
/**
Modified: hama/trunk/core/src/main/java/org/apache/hama/bsp/TextArrayWritable.java
URL: http://svn.apache.org/viewvc/hama/trunk/core/src/main/java/org/apache/hama/bsp/TextArrayWritable.java?rev=1448523&r1=1448522&r2=1448523&view=diff
==============================================================================
--- hama/trunk/core/src/main/java/org/apache/hama/bsp/TextArrayWritable.java (original)
+++ hama/trunk/core/src/main/java/org/apache/hama/bsp/TextArrayWritable.java Thu Feb 21 06:38:33 2013
@@ -26,4 +26,4 @@ public class TextArrayWritable extends A
super(Text.class);
}
-}
\ No newline at end of file
+}
Modified: hama/trunk/core/src/main/java/org/apache/hama/bsp/UpdatePeerAction.java
URL: http://svn.apache.org/viewvc/hama/trunk/core/src/main/java/org/apache/hama/bsp/UpdatePeerAction.java?rev=1448523&r1=1448522&r2=1448523&view=diff
==============================================================================
--- hama/trunk/core/src/main/java/org/apache/hama/bsp/UpdatePeerAction.java (original)
+++ hama/trunk/core/src/main/java/org/apache/hama/bsp/UpdatePeerAction.java Thu Feb 21 06:38:33 2013
@@ -24,21 +24,21 @@ import java.io.IOException;
import org.apache.hadoop.io.Text;
/**
- * Represents a directive from the {@link org.apache.hama.bsp.BSPMaster}
- * to the {@link org.apache.hama.bsp.GroomServer} to kill a task.
+ * Represents a directive from the {@link org.apache.hama.bsp.BSPMaster} to the
+ * {@link org.apache.hama.bsp.GroomServer} to kill a task.
*/
class UpdatePeerAction extends GroomServerAction {
TaskAttemptID taskId;
TaskAttemptID peerTaskId;
Text groomName;
-
+
public UpdatePeerAction() {
super(ActionType.UPDATE_PEER);
taskId = new TaskAttemptID();
groomName = new Text("");
}
-
- public UpdatePeerAction(TaskAttemptID taskId, TaskAttemptID peerTaskId,
+
+ public UpdatePeerAction(TaskAttemptID taskId, TaskAttemptID peerTaskId,
String groom) {
super(ActionType.UPDATE_PEER);
this.taskId = taskId;
@@ -49,15 +49,15 @@ class UpdatePeerAction extends GroomServ
public TaskAttemptID getTaskID() {
return taskId;
}
-
- public TaskAttemptID getPeerTaskID(){
+
+ public TaskAttemptID getPeerTaskID() {
return peerTaskId;
}
-
- public String getGroomName(){
+
+ public String getGroomName() {
return groomName.toString();
}
-
+
@Override
public void write(DataOutput out) throws IOException {
taskId.write(out);