You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hama.apache.org by ed...@apache.org on 2015/04/13 03:00:07 UTC

svn commit: r1673079 - in /hama/trunk: ./ core/src/main/java/org/apache/hama/ core/src/main/java/org/apache/hama/bsp/ core/src/main/java/org/apache/hama/bsp/message/ core/src/test/java/org/apache/hama/bsp/ core/src/test/java/org/apache/hama/bsp/message...

Author: edwardyoon
Date: Mon Apr 13 01:00:06 2015
New Revision: 1673079

URL: http://svn.apache.org/r1673079
Log:
HAMA-946: Improving graph package

Removed:
    hama/trunk/graph/src/main/java/org/apache/hama/graph/OffHeapVerticesInfo.java
Modified:
    hama/trunk/core/src/main/java/org/apache/hama/Constants.java
    hama/trunk/core/src/main/java/org/apache/hama/bsp/BSPJobClient.java
    hama/trunk/core/src/main/java/org/apache/hama/bsp/BSPMessageBundle.java
    hama/trunk/core/src/main/java/org/apache/hama/bsp/LocalBSPRunner.java
    hama/trunk/core/src/main/java/org/apache/hama/bsp/message/AbstractMessageManager.java
    hama/trunk/core/src/main/java/org/apache/hama/bsp/message/OutgoingPOJOMessageBundle.java
    hama/trunk/core/src/test/java/org/apache/hama/bsp/TestBSPMessageBundle.java
    hama/trunk/core/src/test/java/org/apache/hama/bsp/TestCheckpoint.java
    hama/trunk/core/src/test/java/org/apache/hama/bsp/message/TestHamaAsyncMessageManager.java
    hama/trunk/core/src/test/java/org/apache/hama/bsp/message/TestHamaMessageManager.java
    hama/trunk/core/src/test/java/org/apache/hama/bsp/message/compress/TestBSPMessageCompressor.java
    hama/trunk/examples/src/main/java/org/apache/hama/examples/BipartiteMatching.java
    hama/trunk/examples/src/main/java/org/apache/hama/examples/DynamicGraph.java
    hama/trunk/examples/src/main/java/org/apache/hama/examples/KCore.java
    hama/trunk/examples/src/main/java/org/apache/hama/examples/PageRank.java
    hama/trunk/examples/src/test/java/org/apache/hama/examples/BipartiteMatchingTest.java
    hama/trunk/examples/src/test/java/org/apache/hama/examples/PageRankTest.java
    hama/trunk/graph/src/main/java/org/apache/hama/graph/GraphJobMessage.java
    hama/trunk/graph/src/main/java/org/apache/hama/graph/GraphJobRunner.java
    hama/trunk/graph/src/main/java/org/apache/hama/graph/IncomingVertexMessageManager.java
    hama/trunk/graph/src/main/java/org/apache/hama/graph/MapVerticesInfo.java
    hama/trunk/graph/src/main/java/org/apache/hama/graph/OutgoingVertexMessageManager.java
    hama/trunk/graph/src/main/java/org/apache/hama/graph/Vertex.java
    hama/trunk/graph/src/main/java/org/apache/hama/graph/VerticesInfo.java
    hama/trunk/graph/src/test/java/org/apache/hama/graph/TestSubmitGraphJob.java
    hama/trunk/pom.xml

Modified: hama/trunk/core/src/main/java/org/apache/hama/Constants.java
URL: http://svn.apache.org/viewvc/hama/trunk/core/src/main/java/org/apache/hama/Constants.java?rev=1673079&r1=1673078&r2=1673079&view=diff
==============================================================================
--- hama/trunk/core/src/main/java/org/apache/hama/Constants.java (original)
+++ hama/trunk/core/src/main/java/org/apache/hama/Constants.java Mon Apr 13 01:00:06 2015
@@ -129,7 +129,7 @@ public interface Constants {
   public static final String RUNTIME_DESIRED_PEERS_COUNT = "desired.num.of.tasks";
   public static final String RUNTIME_PARTITION_RECORDCONVERTER = "bsp.runtime.partition.recordconverter";
 
-  public static final String PARTITION_SORT_BY_KEY = "bsp.partition.sort.by.converted.record";   
+  public static final String PARTITION_SORT_BY_KEY = "bsp.partition.sort.by.converted.record";
 
   // /////////////////////////////////////
   // Constants for ZooKeeper

Modified: hama/trunk/core/src/main/java/org/apache/hama/bsp/BSPJobClient.java
URL: http://svn.apache.org/viewvc/hama/trunk/core/src/main/java/org/apache/hama/bsp/BSPJobClient.java?rev=1673079&r1=1673078&r2=1673079&view=diff
==============================================================================
--- hama/trunk/core/src/main/java/org/apache/hama/bsp/BSPJobClient.java (original)
+++ hama/trunk/core/src/main/java/org/apache/hama/bsp/BSPJobClient.java Mon Apr 13 01:00:06 2015
@@ -45,7 +45,7 @@ import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.fs.permission.FsPermission;
 import org.apache.hadoop.io.BytesWritable;
 import org.apache.hadoop.io.DataOutputBuffer;
-import org.apache.hadoop.io.NullWritable;
+import org.apache.hadoop.io.MapWritable;
 import org.apache.hadoop.io.SequenceFile;
 import org.apache.hadoop.io.SequenceFile.CompressionType;
 import org.apache.hadoop.io.Text;
@@ -58,6 +58,11 @@ import org.apache.hadoop.util.Tool;
 import org.apache.hadoop.util.ToolRunner;
 import org.apache.hama.Constants;
 import org.apache.hama.HamaConfiguration;
+import org.apache.hama.bsp.message.MessageManager;
+import org.apache.hama.bsp.message.OutgoingMessageManager;
+import org.apache.hama.bsp.message.OutgoingPOJOMessageBundle;
+import org.apache.hama.bsp.message.queue.MessageQueue;
+import org.apache.hama.bsp.message.queue.SortedMemoryQueue;
 import org.apache.hama.ipc.HamaRPCProtocolVersion;
 import org.apache.hama.ipc.JobSubmissionProtocol;
 import org.apache.hama.ipc.RPC;
@@ -346,6 +351,7 @@ public class BSPJobClient extends Config
 
       InputSplit[] splits = job.getInputFormat().getSplits(job, maxTasks);
 
+      /*
       job = partition(job, splits, maxTasks);
       maxTasks = job.getInt("hama.partition.count", maxTasks);
 
@@ -358,6 +364,7 @@ public class BSPJobClient extends Config
             "Job failed! The number of splits has exceeded the number of max tasks. The number of splits: "
                 + splits.length + ", The number of max tasks: " + maxTasks);
       }
+      */
 
       job.setNumBspTask(writeSplits(job, splits, submitSplitFile, maxTasks));
       job.set("bsp.job.split.file", submitSplitFile.toString());
@@ -451,13 +458,22 @@ public class BSPJobClient extends Config
             + partitioningJob.getJobName());
         LOG.debug("partitioningJob input: "
             + partitioningJob.get(Constants.JOB_INPUT_DIR));
+        
+        partitioningJob.getConfiguration().setClass(MessageManager.OUTGOING_MESSAGE_MANAGER_CLASS,
+            OutgoingPOJOMessageBundle.class, OutgoingMessageManager.class);
+        partitioningJob.getConfiguration().setClass(MessageManager.RECEIVE_QUEUE_TYPE_CLASS,
+            SortedMemoryQueue.class, MessageQueue.class);
+        
         partitioningJob.setInputFormat(job.getInputFormat().getClass());
         partitioningJob.setInputKeyClass(job.getInputKeyClass());
         partitioningJob.setInputValueClass(job.getInputValueClass());
-        partitioningJob.setOutputFormat(NullOutputFormat.class);
-        partitioningJob.setOutputKeyClass(NullWritable.class);
-        partitioningJob.setOutputValueClass(NullWritable.class);
+        
+        partitioningJob.setOutputFormat(SequenceFileOutputFormat.class);
+        partitioningJob.setOutputKeyClass(job.getInputKeyClass());
+        partitioningJob.setOutputValueClass(job.getInputValueClass());
+        
         partitioningJob.setBspClass(PartitioningRunner.class);
+        partitioningJob.setMessageClass(MapWritable.class);
         partitioningJob.set("bsp.partitioning.runner.job", "true");
         partitioningJob.getConfiguration().setBoolean(
             Constants.ENABLE_RUNTIME_PARTITIONING, false);
@@ -554,15 +570,15 @@ public class BSPJobClient extends Config
       RawSplit rawSplit = new RawSplit();
       for (InputSplit split : splits) {
 
+        /*
         // set partitionID to rawSplit
-        if (split.getClass().getName().equals(FileSplit.class.getName())
-            && job.getConfiguration().get(Constants.RUNTIME_PARTITIONING_CLASS) != null
-            && job.get("bsp.partitioning.runner.job") == null) {
+        if (split.getClass().getName().equals(FileSplit.class.getName())) {
           LOG.debug(((FileSplit) split).getPath().getName());
           String[] extractPartitionID = ((FileSplit) split).getPath().getName()
               .split("[-]");
           rawSplit.setPartitionID(Integer.parseInt(extractPartitionID[1]));
         }
+        */
 
         rawSplit.setClassName(split.getClass().getName());
         buffer.reset();

Modified: hama/trunk/core/src/main/java/org/apache/hama/bsp/BSPMessageBundle.java
URL: http://svn.apache.org/viewvc/hama/trunk/core/src/main/java/org/apache/hama/bsp/BSPMessageBundle.java?rev=1673079&r1=1673078&r2=1673079&view=diff
==============================================================================
--- hama/trunk/core/src/main/java/org/apache/hama/bsp/BSPMessageBundle.java (original)
+++ hama/trunk/core/src/main/java/org/apache/hama/bsp/BSPMessageBundle.java Mon Apr 13 01:00:06 2015
@@ -65,11 +65,29 @@ public class BSPMessageBundle<M extends
     }
 
     kryo.writeObject(output, message);
-    output.flush();
-
     bundleSize++;
   }
-
+  
+  public void addMessages(Iterator<M> iterator) {
+    M message = iterator.next();
+    if (className == null) {
+      className = message.getClass().getName();
+      kryo.register(message.getClass());
+    }
+    
+    kryo.writeObject(output, message);
+    bundleSize++;
+    
+    while(iterator.hasNext()) {
+      kryo.writeObject(output, iterator.next());
+      bundleSize++;
+    }
+  }
+  
+  public void finishAddition() {
+    output.flush();
+  }
+  
   public byte[] getBuffer() {
     return outputStream.toByteArray();
   }

Modified: hama/trunk/core/src/main/java/org/apache/hama/bsp/LocalBSPRunner.java
URL: http://svn.apache.org/viewvc/hama/trunk/core/src/main/java/org/apache/hama/bsp/LocalBSPRunner.java?rev=1673079&r1=1673078&r2=1673079&view=diff
==============================================================================
--- hama/trunk/core/src/main/java/org/apache/hama/bsp/LocalBSPRunner.java (original)
+++ hama/trunk/core/src/main/java/org/apache/hama/bsp/LocalBSPRunner.java Mon Apr 13 01:00:06 2015
@@ -352,7 +352,7 @@ public class LocalBSPRunner implements J
         throws IOException {
       peer.incrementCounter(BSPPeerImpl.PeerCounter.TOTAL_MESSAGE_BYTES_TRANSFERED,
           bundle.getLength());
-
+      
       MANAGER_MAP.get(addr).localQueueForNextIteration.addBundle(bundle);
       peer.incrementCounter(BSPPeerImpl.PeerCounter.TOTAL_MESSAGES_RECEIVED,
           bundle.size());

Modified: hama/trunk/core/src/main/java/org/apache/hama/bsp/message/AbstractMessageManager.java
URL: http://svn.apache.org/viewvc/hama/trunk/core/src/main/java/org/apache/hama/bsp/message/AbstractMessageManager.java?rev=1673079&r1=1673078&r2=1673079&view=diff
==============================================================================
--- hama/trunk/core/src/main/java/org/apache/hama/bsp/message/AbstractMessageManager.java (original)
+++ hama/trunk/core/src/main/java/org/apache/hama/bsp/message/AbstractMessageManager.java Mon Apr 13 01:00:06 2015
@@ -170,6 +170,7 @@ public abstract class AbstractMessageMan
   public void send(String peerName, M msg) throws IOException {
     outgoingMessageManager.addMessage(peerName, msg);
     peer.incrementCounter(BSPPeerImpl.PeerCounter.TOTAL_MESSAGES_SENT, 1L);
+
     notifySentMessage(peerName, msg);
   }
 

Modified: hama/trunk/core/src/main/java/org/apache/hama/bsp/message/OutgoingPOJOMessageBundle.java
URL: http://svn.apache.org/viewvc/hama/trunk/core/src/main/java/org/apache/hama/bsp/message/OutgoingPOJOMessageBundle.java?rev=1673079&r1=1673078&r2=1673079&view=diff
==============================================================================
--- hama/trunk/core/src/main/java/org/apache/hama/bsp/message/OutgoingPOJOMessageBundle.java (original)
+++ hama/trunk/core/src/main/java/org/apache/hama/bsp/message/OutgoingPOJOMessageBundle.java Mon Apr 13 01:00:06 2015
@@ -70,6 +70,9 @@ public class OutgoingPOJOMessageBundle<M
 
   @Override
   public Iterator<Entry<InetSocketAddress, BSPMessageBundle<M>>> getBundleIterator() {
+    for(BSPMessageBundle<M> b : outgoingBundles.values()) {
+      b.finishAddition();
+    }
     return outgoingBundles.entrySet().iterator();
   }
 

Modified: hama/trunk/core/src/test/java/org/apache/hama/bsp/TestBSPMessageBundle.java
URL: http://svn.apache.org/viewvc/hama/trunk/core/src/test/java/org/apache/hama/bsp/TestBSPMessageBundle.java?rev=1673079&r1=1673078&r2=1673079&view=diff
==============================================================================
--- hama/trunk/core/src/test/java/org/apache/hama/bsp/TestBSPMessageBundle.java (original)
+++ hama/trunk/core/src/test/java/org/apache/hama/bsp/TestBSPMessageBundle.java Mon Apr 13 01:00:06 2015
@@ -38,6 +38,7 @@ public class TestBSPMessageBundle extend
     // Serialize it.
     ByteArrayOutputStream baos = new ByteArrayOutputStream();
     bundle.write(new DataOutputStream(baos));
+    bundle.finishAddition();
     baos.close();
     // Deserialize it.
     BSPMessageBundle<BytesWritable> readBundle = new BSPMessageBundle<BytesWritable>();
@@ -64,6 +65,8 @@ public class TestBSPMessageBundle extend
       testMessages[i] = msg;
       bundle.addMessage(testMessages[i]);
     }
+    bundle.finishAddition();
+    
     // Serialize it.
     ByteArrayOutputStream baos = new ByteArrayOutputStream();
     bundle.write(new DataOutputStream(baos));

Modified: hama/trunk/core/src/test/java/org/apache/hama/bsp/TestCheckpoint.java
URL: http://svn.apache.org/viewvc/hama/trunk/core/src/test/java/org/apache/hama/bsp/TestCheckpoint.java?rev=1673079&r1=1673078&r2=1673079&view=diff
==============================================================================
--- hama/trunk/core/src/test/java/org/apache/hama/bsp/TestCheckpoint.java (original)
+++ hama/trunk/core/src/test/java/org/apache/hama/bsp/TestCheckpoint.java Mon Apr 13 01:00:06 2015
@@ -653,7 +653,8 @@ public class TestCheckpoint extends Test
 
     BSPMessageBundle<Text> bundleRead = messenger.getLoopbackBundle();
     assertEquals(5, bundleRead.size());
-
+    bundleRead.finishAddition();
+    
     String recoveredMsg = bundleRead.iterator().next().toString();
     assertEquals(recoveredMsg, "data");
     dfs.delete(new Path("checkpoint"), true);

Modified: hama/trunk/core/src/test/java/org/apache/hama/bsp/message/TestHamaAsyncMessageManager.java
URL: http://svn.apache.org/viewvc/hama/trunk/core/src/test/java/org/apache/hama/bsp/message/TestHamaAsyncMessageManager.java?rev=1673079&r1=1673078&r2=1673079&view=diff
==============================================================================
--- hama/trunk/core/src/test/java/org/apache/hama/bsp/message/TestHamaAsyncMessageManager.java (original)
+++ hama/trunk/core/src/test/java/org/apache/hama/bsp/message/TestHamaAsyncMessageManager.java Mon Apr 13 01:00:06 2015
@@ -91,6 +91,8 @@ public class TestHamaAsyncMessageManager
       bundle.addMessage(it.next());
     }
 
+    bundle.finishAddition();
+    
     messageManager.transfer(peer, bundle);
 
     messageManager.clearOutgoingMessages();

Modified: hama/trunk/core/src/test/java/org/apache/hama/bsp/message/TestHamaMessageManager.java
URL: http://svn.apache.org/viewvc/hama/trunk/core/src/test/java/org/apache/hama/bsp/message/TestHamaMessageManager.java?rev=1673079&r1=1673078&r2=1673079&view=diff
==============================================================================
--- hama/trunk/core/src/test/java/org/apache/hama/bsp/message/TestHamaMessageManager.java (original)
+++ hama/trunk/core/src/test/java/org/apache/hama/bsp/message/TestHamaMessageManager.java Mon Apr 13 01:00:06 2015
@@ -90,7 +90,8 @@ public class TestHamaMessageManager exte
     while (it.hasNext()) {
       bundle.addMessage(it.next());
     }
-
+    bundle.finishAddition();
+    
     messageManager.transfer(peer, bundle);
 
     messageManager.clearOutgoingMessages();

Modified: hama/trunk/core/src/test/java/org/apache/hama/bsp/message/compress/TestBSPMessageCompressor.java
URL: http://svn.apache.org/viewvc/hama/trunk/core/src/test/java/org/apache/hama/bsp/message/compress/TestBSPMessageCompressor.java?rev=1673079&r1=1673078&r2=1673079&view=diff
==============================================================================
--- hama/trunk/core/src/test/java/org/apache/hama/bsp/message/compress/TestBSPMessageCompressor.java (original)
+++ hama/trunk/core/src/test/java/org/apache/hama/bsp/message/compress/TestBSPMessageCompressor.java Mon Apr 13 01:00:06 2015
@@ -50,6 +50,8 @@ public class TestBSPMessageCompressor ex
       a.addMessage(new IntWritable(i));
     }
 
+    a.finishAddition();
+    
     ByteArrayOutputStream byteBuffer = new ByteArrayOutputStream();
     DataOutputStream bufferDos = new DataOutputStream(byteBuffer);
     a.write(bufferDos);

Modified: hama/trunk/examples/src/main/java/org/apache/hama/examples/BipartiteMatching.java
URL: http://svn.apache.org/viewvc/hama/trunk/examples/src/main/java/org/apache/hama/examples/BipartiteMatching.java?rev=1673079&r1=1673078&r2=1673079&view=diff
==============================================================================
--- hama/trunk/examples/src/main/java/org/apache/hama/examples/BipartiteMatching.java (original)
+++ hama/trunk/examples/src/main/java/org/apache/hama/examples/BipartiteMatching.java Mon Apr 13 01:00:06 2015
@@ -191,8 +191,12 @@ public final class BipartiteMatching {
     job.setEdgeValueClass(NullWritable.class);
 
     job.setInputFormat(TextInputFormat.class);
+    job.setInputKeyClass(LongWritable.class);
+    job.setInputValueClass(Text.class);
+    
     job.setVertexInputReaderClass(BipartiteMatchingVertexReader.class);
     job.setPartitioner(HashPartitioner.class);
+    
     job.setOutputFormat(TextOutputFormat.class);
     job.setOutputKeyClass(Text.class);
     job.setOutputValueClass(TextPair.class);

Modified: hama/trunk/examples/src/main/java/org/apache/hama/examples/DynamicGraph.java
URL: http://svn.apache.org/viewvc/hama/trunk/examples/src/main/java/org/apache/hama/examples/DynamicGraph.java?rev=1673079&r1=1673078&r2=1673079&view=diff
==============================================================================
--- hama/trunk/examples/src/main/java/org/apache/hama/examples/DynamicGraph.java (original)
+++ hama/trunk/examples/src/main/java/org/apache/hama/examples/DynamicGraph.java Mon Apr 13 01:00:06 2015
@@ -153,7 +153,9 @@ public class DynamicGraph {
     graphJob.setEdgeValueClass(NullWritable.class);
 
     graphJob.setInputFormat(TextInputFormat.class);
-
+    graphJob.setInputKeyClass(LongWritable.class);
+    graphJob.setInputValueClass(Text.class);
+    
     graphJob.setVertexInputReaderClass(GraphTextReader.class);
     graphJob.setPartitioner(HashPartitioner.class);
 

Modified: hama/trunk/examples/src/main/java/org/apache/hama/examples/KCore.java
URL: http://svn.apache.org/viewvc/hama/trunk/examples/src/main/java/org/apache/hama/examples/KCore.java?rev=1673079&r1=1673078&r2=1673079&view=diff
==============================================================================
--- hama/trunk/examples/src/main/java/org/apache/hama/examples/KCore.java (original)
+++ hama/trunk/examples/src/main/java/org/apache/hama/examples/KCore.java Mon Apr 13 01:00:06 2015
@@ -23,7 +23,9 @@ import org.apache.hadoop.conf.Configurat
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.io.IntWritable;
 import org.apache.hadoop.io.LongWritable;
+import org.apache.hadoop.io.Text;
 import org.apache.hama.HamaConfiguration;
+import org.apache.hama.bsp.TextInputFormat;
 import org.apache.hama.graph.GraphJob;
 import org.apache.hama.ml.kcore.KCoreMessage;
 import org.apache.hama.ml.kcore.KCoreVertex;
@@ -50,6 +52,10 @@ public class KCore {
     graphJob.setOutputKeyClass(LongWritable.class);
     graphJob.setOutputValueClass(IntWritable.class);
 
+    graphJob.setInputFormat(TextInputFormat.class);
+    graphJob.setInputKeyClass(LongWritable.class);
+    graphJob.setInputValueClass(Text.class);
+    
     return graphJob;
   }
 

Modified: hama/trunk/examples/src/main/java/org/apache/hama/examples/PageRank.java
URL: http://svn.apache.org/viewvc/hama/trunk/examples/src/main/java/org/apache/hama/examples/PageRank.java?rev=1673079&r1=1673078&r2=1673079&view=diff
==============================================================================
--- hama/trunk/examples/src/main/java/org/apache/hama/examples/PageRank.java (original)
+++ hama/trunk/examples/src/main/java/org/apache/hama/examples/PageRank.java Mon Apr 13 01:00:06 2015
@@ -135,6 +135,8 @@ public class PageRank {
     pageJob.setEdgeValueClass(NullWritable.class);
 
     pageJob.setInputFormat(SequenceFileInputFormat.class);
+    pageJob.setInputKeyClass(Text.class);
+    pageJob.setInputValueClass(TextArrayWritable.class);
 
     pageJob.setPartitioner(HashPartitioner.class);
     pageJob.setOutputFormat(TextOutputFormat.class);

Modified: hama/trunk/examples/src/test/java/org/apache/hama/examples/BipartiteMatchingTest.java
URL: http://svn.apache.org/viewvc/hama/trunk/examples/src/test/java/org/apache/hama/examples/BipartiteMatchingTest.java?rev=1673079&r1=1673078&r2=1673079&view=diff
==============================================================================
--- hama/trunk/examples/src/test/java/org/apache/hama/examples/BipartiteMatchingTest.java (original)
+++ hama/trunk/examples/src/test/java/org/apache/hama/examples/BipartiteMatchingTest.java Mon Apr 13 01:00:06 2015
@@ -108,7 +108,7 @@ public class BipartiteMatchingTest exten
 
   private void verifyResult() throws IOException {
     FileStatus[] files = fs.globStatus(new Path(OUTPUT + "/part-*"));
-    assertTrue("Not enough files found: " + files.length, files.length == 2);
+    assertTrue("Not enough files found: " + files.length, files.length == 1);
 
     for (FileStatus file : files) {
       if (file.getLen() > 0) {

Modified: hama/trunk/examples/src/test/java/org/apache/hama/examples/PageRankTest.java
URL: http://svn.apache.org/viewvc/hama/trunk/examples/src/test/java/org/apache/hama/examples/PageRankTest.java?rev=1673079&r1=1673078&r2=1673079&view=diff
==============================================================================
--- hama/trunk/examples/src/test/java/org/apache/hama/examples/PageRankTest.java (original)
+++ hama/trunk/examples/src/test/java/org/apache/hama/examples/PageRankTest.java Mon Apr 13 01:00:06 2015
@@ -79,7 +79,7 @@ public class PageRankTest extends TestCa
 
   private void generateTestData() {
     try {
-      FastGraphGen.main(new String[] { "400", "10", INPUT, "3" });
+      FastGraphGen.main(new String[] { "60", "3", INPUT, "3" });
     } catch (Exception e) {
       e.printStackTrace();
     }

Modified: hama/trunk/graph/src/main/java/org/apache/hama/graph/GraphJobMessage.java
URL: http://svn.apache.org/viewvc/hama/trunk/graph/src/main/java/org/apache/hama/graph/GraphJobMessage.java?rev=1673079&r1=1673078&r2=1673079&view=diff
==============================================================================
--- hama/trunk/graph/src/main/java/org/apache/hama/graph/GraphJobMessage.java (original)
+++ hama/trunk/graph/src/main/java/org/apache/hama/graph/GraphJobMessage.java Mon Apr 13 01:00:06 2015
@@ -46,6 +46,7 @@ public final class GraphJobMessage imple
   public static final int MAP_FLAG = 0x01;
   public static final int VERTEX_FLAG = 0x02;
   public static final int VERTICES_SIZE_FLAG = 0x04;
+  public static final int PARTITION_FLAG = 0x08;
 
   // default flag to -1 "unknown"
   private int flag = -1;
@@ -54,7 +55,8 @@ public final class GraphJobMessage imple
   private WritableComparable vertexId;
   private IntWritable integerMessage;
   private static GraphJobMessageComparator comparator;
-
+  private Vertex<?, ?, ?> vertex;
+  
   private int numOfValues = 0;
 
   private final ByteArrayOutputStream byteBuffer = new ByteArrayOutputStream();
@@ -106,6 +108,10 @@ public final class GraphJobMessage imple
     return map;
   }
 
+  public Vertex<?, ?, ?> getVertex() {
+    return vertex;
+  }
+  
   public WritableComparable<?> getVertexId() {
     return vertexId;
   }
@@ -132,13 +138,24 @@ public final class GraphJobMessage imple
       byteBuffer.write(a.toByteArray());
       numOfValues++;
     } catch (IOException e) {
+      // TODO Auto-generated catch block
       e.printStackTrace();
     }
   }
 
   public void addAll(List<Writable> values) {
-    for (Writable v : values)
-      add(v);
+    ByteArrayOutputStream a = new ByteArrayOutputStream();
+    DataOutputStream b = new DataOutputStream(a);
+    try {
+      for (Writable v : values) {
+        v.write(b);
+      }
+
+      byteBuffer.write(a.toByteArray());
+      numOfValues += values.size();
+    } catch (IOException e) {
+      e.printStackTrace();
+    }
   }
 
   public int getNumOfValues() {
@@ -150,6 +167,11 @@ public final class GraphJobMessage imple
     this.integerMessage = size;
   }
 
+  public GraphJobMessage(Vertex<?, ?, ?> vertex) {
+    this.flag = PARTITION_FLAG;
+    this.vertex = vertex;
+  }
+
   @Override
   public void write(DataOutput out) throws IOException {
     out.writeByte(this.flag);
@@ -165,6 +187,8 @@ public final class GraphJobMessage imple
       map.write(out);
     } else if (isVerticesSizeMessage()) {
       integerMessage.write(out);
+    } else if (isPartitioningMessage()) {
+      vertex.write(out);
     } else {
       vertexId.write(out);
     }
@@ -210,6 +234,9 @@ public final class GraphJobMessage imple
     } else if (isVerticesSizeMessage()) {
       integerMessage = new IntWritable();
       integerMessage.readFields(in);
+    } else if (isPartitioningMessage()) {
+      vertex = (Vertex<?, ?, ?>) ReflectionUtils.newInstance(GraphJobRunner.VERTEX_CLASS, null);
+      vertex.readFields(in);
     } else {
       vertexId = ReflectionUtils.newInstance(GraphJobRunner.VERTEX_ID_CLASS,
           null);
@@ -255,6 +282,10 @@ public final class GraphJobMessage imple
     return flag == VERTICES_SIZE_FLAG;
   }
 
+  public boolean isPartitioningMessage() {
+    return flag == PARTITION_FLAG;
+  }
+  
   @Override
   public String toString() {
     if (isVertexMessage()) {
@@ -265,7 +296,7 @@ public final class GraphJobMessage imple
       return "#Vertices: " + integerMessage;
     } else {
       return "GraphJobMessage [flag=" + flag + ", map=" + map + ", vertexId="
-          + vertexId + ", vertexValue=" + numOfValues + "]";
+          + vertexId + ", vertexValue=" + numOfValues + ", " + vertex.toString() + "]";
     }
   }
 
@@ -308,14 +339,14 @@ public final class GraphJobMessage imple
     }
   }
 
-
   public Iterable<Writable> getIterableMessages() {
 
     return new Iterable<Writable>() {
       @Override
       public Iterator<Writable> iterator() {
         return new Iterator<Writable>() {
-          ByteArrayInputStream bis = new ByteArrayInputStream(byteBuffer.toByteArray());
+          ByteArrayInputStream bis = new ByteArrayInputStream(
+              byteBuffer.toByteArray());
           DataInputStream dis = new DataInputStream(bis);
           int index = 0;
 
@@ -344,5 +375,4 @@ public final class GraphJobMessage imple
     };
   }
 
-
 }

Modified: hama/trunk/graph/src/main/java/org/apache/hama/graph/GraphJobRunner.java
URL: http://svn.apache.org/viewvc/hama/trunk/graph/src/main/java/org/apache/hama/graph/GraphJobRunner.java?rev=1673079&r1=1673078&r2=1673079&view=diff
==============================================================================
--- hama/trunk/graph/src/main/java/org/apache/hama/graph/GraphJobRunner.java (original)
+++ hama/trunk/graph/src/main/java/org/apache/hama/graph/GraphJobRunner.java Mon Apr 13 01:00:06 2015
@@ -18,9 +18,11 @@
 package org.apache.hama.graph;
 
 import java.io.IOException;
+import java.util.ArrayList;
 import java.util.Collections;
 import java.util.HashSet;
 import java.util.Iterator;
+import java.util.List;
 import java.util.Map.Entry;
 import java.util.Set;
 
@@ -39,12 +41,12 @@ import org.apache.hama.bsp.BSP;
 import org.apache.hama.bsp.BSPPeer;
 import org.apache.hama.bsp.HashPartitioner;
 import org.apache.hama.bsp.Partitioner;
-import org.apache.hama.bsp.PartitioningRunner.DefaultRecordConverter;
-import org.apache.hama.bsp.PartitioningRunner.RecordConverter;
 import org.apache.hama.bsp.sync.SyncException;
 import org.apache.hama.commons.util.KeyValuePair;
 import org.apache.hama.util.ReflectionUtils;
 
+import com.google.common.collect.Lists;
+
 /**
  * Fully generic graph job runner.
  * 
@@ -100,7 +102,7 @@ public final class GraphJobRunner<V exte
   private long numberVertices = 0;
   // -1 is deactivated
   private int maxIteration = -1;
-  private long iteration;
+  private long iteration = 0;
 
   private AggregationRunner<V, E, M> aggregationRunner;
   private VertexOutputWriter<Writable, Writable, V, E, M> vertexOutputWriter;
@@ -114,12 +116,20 @@ public final class GraphJobRunner<V exte
 
     setupFields(peer);
 
+    long startTime = System.currentTimeMillis();
     loadVertices(peer);
+    LOG.info("Total time spent for loading vertices: "
+        + (System.currentTimeMillis() - startTime) + " ms");
 
+    startTime = System.currentTimeMillis();
     countGlobalVertexCount(peer);
+    LOG.info("Total time spent for broadcasting global vertex count: "
+        + (System.currentTimeMillis() - startTime) + " ms");
 
+    startTime = System.currentTimeMillis();
     doInitialSuperstep(peer);
-
+    LOG.info("Total time spent for initial superstep: "
+        + (System.currentTimeMillis() - startTime) + " ms");
   }
 
   @Override
@@ -137,15 +147,23 @@ public final class GraphJobRunner<V exte
 
       // note that the messages must be parsed here
       GraphJobMessage firstVertexMessage = parseMessages(peer);
+
+      long startTime = System.currentTimeMillis();
       // master/slaves needs to update
       doAggregationUpdates(peer);
+      LOG.info("Total time spent for broadcasting aggregation values: "
+          + (System.currentTimeMillis() - startTime) + " ms");
+
       // check if updated changed by our aggregators
       if (!updated) {
         break;
       }
 
       // loop over vertices and do their computation
+      startTime = System.currentTimeMillis();
       doSuperstep(firstVertexMessage, peer);
+      LOG.info("Total time spent for " + peer.getSuperstepCount()
+          + " superstep: " + (System.currentTimeMillis() - startTime) + " ms");
 
       if (isMasterTask(peer)) {
         peer.getCounter(GraphJobCounter.ITERATIONS).increment(1);
@@ -249,7 +267,7 @@ public final class GraphJobRunner<V exte
 
       currentMessage = peer.getCurrentMessage();
     }
-
+    
     for (V v : notComputedVertices) {
       vertex = vertices.get(v);
       if (!vertex.isHalted()) {
@@ -275,16 +293,23 @@ public final class GraphJobRunner<V exte
     this.changedVertexCnt = 0;
     vertices.startSuperstep();
 
-    Iterator<Vertex<V, E, M>> iterator = vertices.iterator();
-
-    while (iterator.hasNext()) {
-      Vertex<V, E, M> vertex = iterator.next();
-
-      // Calls setup method.
-      vertex.setup(conf);
-
-      vertex.compute(Collections.singleton(vertex.getValue()));
-      vertices.finishVertexComputation(vertex);
+    List<Thread> runners = new ArrayList<Thread>();
+    List<Vertex<V, E, M>> v = new ArrayList<Vertex<V, E, M>>(
+        vertices.getValues());
+    for (List<Vertex<V, E, M>> partition : Lists.partition(v, conf.getInt("hama.graph.thread.num", 30))) {
+      runners.add(new Computer(partition));
+    }
+
+    for (Thread computer : runners) {
+      computer.start();
+    }
+
+    for (Thread computer : runners) {
+      try {
+        computer.join();
+      } catch (InterruptedException e) {
+        e.printStackTrace();
+      }
     }
 
     vertices.finishSuperstep();
@@ -292,6 +317,27 @@ public final class GraphJobRunner<V exte
     iteration++;
   }
 
+  class Computer extends Thread {
+    List<Vertex<V, E, M>> partition;
+
+    public Computer(List<Vertex<V, E, M>> partition) {
+      this.partition = partition;
+    }
+
+    @Override
+    public void run() {
+      try {
+        for (Vertex<V, E, M> v : partition) {
+          v.setup(conf);
+          v.compute(Collections.singleton(v.getValue()));
+          vertices.finishVertexComputation(v);
+        }
+      } catch (IOException e) {
+        e.printStackTrace();
+      }
+    }
+  }
+
   @SuppressWarnings("unchecked")
   private void setupFields(
       BSPPeer<Writable, Writable, Writable, Writable, GraphJobMessage> peer)
@@ -350,44 +396,33 @@ public final class GraphJobRunner<V exte
   private void loadVertices(
       BSPPeer<Writable, Writable, Writable, Writable, GraphJobMessage> peer)
       throws IOException, SyncException, InterruptedException {
-    RecordConverter converter = org.apache.hadoop.util.ReflectionUtils
+    final VertexInputReader<Writable, Writable, V, E, M> reader = (VertexInputReader<Writable, Writable, V, E, M>) ReflectionUtils
         .newInstance(conf.getClass(Constants.RUNTIME_PARTITION_RECORDCONVERTER,
-            DefaultRecordConverter.class, RecordConverter.class), conf);
-
-    // our VertexInputReader ensures incoming vertices are sorted by their ID
-    Vertex<V, E, M> vertex = GraphJobRunner
-        .<V, E, M> newVertexInstance(VERTEX_CLASS);
-    Vertex<V, E, M> currentVertex = GraphJobRunner
-        .<V, E, M> newVertexInstance(VERTEX_CLASS);
-
-    KeyValuePair<Writable, Writable> record = null;
-    KeyValuePair<Writable, Writable> converted = null;
-
-    while ((record = peer.readNext()) != null) {
-      converted = converter.convertRecord(record, conf);
-      currentVertex = (Vertex<V, E, M>) converted.getValue();
-
-      if (vertex.getVertexID() == null) {
-        vertex = currentVertex;
-      } else {
-        if (vertex.getVertexID().equals(currentVertex.getVertexID())) {
-          for (Edge<V, E> edge : currentVertex.getEdges()) {
-            vertex.addEdge(edge);
-          }
-        } else {
-          if (vertex.compareTo(currentVertex) > 0) {
-            throw new IOException(
-                "The records of split aren't in order by vertex ID.");
-          }
+            VertexInputReader.class));
 
-          addVertex(vertex);
-          vertex = currentVertex;
+    try {
+      KeyValuePair<Writable, Writable> next = null;
+      while ((next = peer.readNext()) != null) {
+        Vertex<V, E, M> vertex = GraphJobRunner
+            .<V, E, M> newVertexInstance(VERTEX_CLASS);
+
+        boolean vertexFinished = reader.parseVertex(next.getKey(),
+            next.getValue(), vertex);
+        if (!vertexFinished) {
+          continue;
         }
+        peer.send(getHostName(vertex.getVertexID()),
+            new GraphJobMessage(vertex));
       }
+    } catch (Exception e) {
+      e.printStackTrace();
     }
-    // add last vertex.
-    addVertex(vertex);
+    peer.sync();
 
+    GraphJobMessage received;
+    while ((received = peer.getCurrentMessage()) != null) {
+      addVertex((Vertex<V, E, M>) received.getVertex());
+    }
     LOG.info(vertices.size() + " vertices are loaded into "
         + peer.getPeerName());
     LOG.debug("Starting Vertex processing!");
@@ -403,6 +438,7 @@ public final class GraphJobRunner<V exte
       vertex.addEdge(new Edge<V, E>(vertex.getVertexID(), null));
     }
 
+    vertex.setRunner(this);
     vertices.put(vertex);
 
     LOG.debug("Added VertexID: " + vertex.getVertexID() + " in peer "
@@ -535,6 +571,20 @@ public final class GraphJobRunner<V exte
     vertices.finishSuperstep();
   }
 
+  public void sendMessage(V dstinationVertexID, M msg) throws IOException {
+    peer.send(getHostName(dstinationVertexID), new GraphJobMessage(
+        dstinationVertexID, msg));
+  }
+
+  /**
+   * @return the destination peer name of the destination of the given directed
+   *         edge.
+   */
+  public String getHostName(V vertexID) {
+    return peer.getPeerName(getPartitioner().getPartition(vertexID, null,
+        peer.getNumPeers()));
+  }
+
   /**
    * @return the number of vertices, globally accumulated.
    */

Modified: hama/trunk/graph/src/main/java/org/apache/hama/graph/IncomingVertexMessageManager.java
URL: http://svn.apache.org/viewvc/hama/trunk/graph/src/main/java/org/apache/hama/graph/IncomingVertexMessageManager.java?rev=1673079&r1=1673078&r2=1673079&view=diff
==============================================================================
--- hama/trunk/graph/src/main/java/org/apache/hama/graph/IncomingVertexMessageManager.java (original)
+++ hama/trunk/graph/src/main/java/org/apache/hama/graph/IncomingVertexMessageManager.java Mon Apr 13 01:00:06 2015
@@ -73,7 +73,7 @@ public class IncomingVertexMessageManage
   public void add(GraphJobMessage item) {
     if (item.isVertexMessage()) {
       msgPerVertex.add(item.getVertexId(), item);
-    } else if (item.isMapMessage() || item.isVerticesSizeMessage()) {
+    } else {
       mapMessages.add(item);
     }
   }

Modified: hama/trunk/graph/src/main/java/org/apache/hama/graph/MapVerticesInfo.java
URL: http://svn.apache.org/viewvc/hama/trunk/graph/src/main/java/org/apache/hama/graph/MapVerticesInfo.java?rev=1673079&r1=1673078&r2=1673079&view=diff
==============================================================================
--- hama/trunk/graph/src/main/java/org/apache/hama/graph/MapVerticesInfo.java (original)
+++ hama/trunk/graph/src/main/java/org/apache/hama/graph/MapVerticesInfo.java Mon Apr 13 01:00:06 2015
@@ -18,6 +18,7 @@
 package org.apache.hama.graph;
 
 import java.io.IOException;
+import java.util.Collection;
 import java.util.HashMap;
 import java.util.Iterator;
 import java.util.Map;
@@ -40,19 +41,21 @@ import org.apache.hama.bsp.TaskAttemptID
  */
 public final class MapVerticesInfo<V extends WritableComparable<V>, E extends Writable, M extends Writable>
     implements VerticesInfo<V, E, M> {
-  private GraphJobRunner<V, E, M> runner;
-
   private final Map<V, Vertex<V, E, M>> vertices = new HashMap<V, Vertex<V, E, M>>();
 
   @Override
   public void init(GraphJobRunner<V, E, M> runner, HamaConfiguration conf,
       TaskAttemptID attempt) throws IOException {
-    this.runner = runner;
   }
 
   @Override
   public void put(Vertex<V, E, M> vertex) throws IOException {
-    vertices.put(vertex.getVertexID(), vertex);
+    if(vertices.containsKey(vertex.getVertexID())) {
+      for(Edge<V, E> e : vertex.getEdges()) 
+      vertices.get(vertex.getVertexID()).addEdge(e);
+    } else {
+      vertices.put(vertex.getVertexID(), vertex);
+    }
   }
 
   @Override
@@ -65,15 +68,18 @@ public final class MapVerticesInfo<V ext
   }
 
   @Override
+  public Collection<Vertex<V, E, M>> getValues() {
+    return vertices.values();  
+  }
+  
+  @Override
   public int size() {
     return vertices.size();
   }
 
   @Override
   public Vertex<V, E, M> get(V vertexID) {
-    Vertex<V, E, M> vertex = vertices.get(vertexID);
-    vertex.setRunner(runner);
-    return vertex;
+    return vertices.get(vertexID);
   }
 
   @Override
@@ -90,9 +96,7 @@ public final class MapVerticesInfo<V ext
 
       @Override
       public Vertex<V, E, M> next() {
-        Vertex<V, E, M> vertex = vertexIterator.next();
-        vertex.setRunner(runner);
-        return vertex;
+        return vertexIterator.next();
       }
 
       @Override
@@ -111,7 +115,7 @@ public final class MapVerticesInfo<V ext
   @Override
   public void finishVertexComputation(Vertex<V, E, M> vertex)
       throws IOException {
-    vertices.put(vertex.getVertexID(), vertex);
+    // do nothing
   }
 
   @Override

Modified: hama/trunk/graph/src/main/java/org/apache/hama/graph/OutgoingVertexMessageManager.java
URL: http://svn.apache.org/viewvc/hama/trunk/graph/src/main/java/org/apache/hama/graph/OutgoingVertexMessageManager.java?rev=1673079&r1=1673078&r2=1673079&view=diff
==============================================================================
--- hama/trunk/graph/src/main/java/org/apache/hama/graph/OutgoingVertexMessageManager.java (original)
+++ hama/trunk/graph/src/main/java/org/apache/hama/graph/OutgoingVertexMessageManager.java Mon Apr 13 01:00:06 2015
@@ -108,12 +108,9 @@ public class OutgoingVertexMessageManage
 
         MessagePerVertex msgStorage = storage.get(bundle.getKey());
         if (msgStorage != null) {
-          Iterator<GraphJobMessage> it = msgStorage.iterator();
-          while (it.hasNext()) {
-            bundle.getValue().addMessage(it.next());
-          }
+          bundle.getValue().addMessages(msgStorage.iterator());
         }
-
+        bundle.getValue().finishAddition();
         storage.remove(bundle.getKey());
         return bundle;
       }

Modified: hama/trunk/graph/src/main/java/org/apache/hama/graph/Vertex.java
URL: http://svn.apache.org/viewvc/hama/trunk/graph/src/main/java/org/apache/hama/graph/Vertex.java?rev=1673079&r1=1673078&r2=1673079&view=diff
==============================================================================
--- hama/trunk/graph/src/main/java/org/apache/hama/graph/Vertex.java (original)
+++ hama/trunk/graph/src/main/java/org/apache/hama/graph/Vertex.java Mon Apr 13 01:00:06 2015
@@ -76,28 +76,14 @@ public abstract class Vertex<V extends W
 
   @Override
   public void sendMessage(Edge<V, E> e, M msg) throws IOException {
-    runner.getPeer().send(getDestinationPeerName(e),
-        new GraphJobMessage(e.getDestinationVertexID(), msg));
+    runner.sendMessage(e.getDestinationVertexID(), msg);
   }
 
-  /**
-   * @return the destination peer name of the destination of the given directed
-   *         edge.
-   */
-  public String getDestinationPeerName(Edge<V, E> edge) {
-    return getDestinationPeerName(edge.getDestinationVertexID());
-  }
-
-  /**
-   * @return the destination peer name of the given vertex id, determined by the
-   *         partitioner.
-   */
-  public String getDestinationPeerName(V vertexId) {
-    return runner.getPeer().getPeerName(
-        getPartitioner().getPartition(vertexId, value,
-            runner.getPeer().getNumPeers()));
+  @Override
+  public void sendMessage(V destinationVertexID, M msg) throws IOException {
+    runner.sendMessage(destinationVertexID, msg);
   }
-
+  
   @Override
   public void sendMessageToNeighbors(M msg) throws IOException {
     final List<Edge<V, E>> outEdges = this.getEdges();
@@ -106,15 +92,6 @@ public abstract class Vertex<V extends W
     }
   }
 
-  @Override
-  public void sendMessage(V destinationVertexID, M msg) throws IOException {
-    int partition = getPartitioner().getPartition(destinationVertexID, msg,
-        runner.getPeer().getNumPeers());
-    String destPeer = runner.getPeer().getAllPeerNames()[partition];
-    runner.getPeer().send(destPeer,
-        new GraphJobMessage(destinationVertexID, msg));
-  }
-
   private void alterVertexCounter(int i) throws IOException {
     this.runner.setChangedVertexCnt(this.runner.getChangedVertexCnt() + i);
   }

Modified: hama/trunk/graph/src/main/java/org/apache/hama/graph/VerticesInfo.java
URL: http://svn.apache.org/viewvc/hama/trunk/graph/src/main/java/org/apache/hama/graph/VerticesInfo.java?rev=1673079&r1=1673078&r2=1673079&view=diff
==============================================================================
--- hama/trunk/graph/src/main/java/org/apache/hama/graph/VerticesInfo.java (original)
+++ hama/trunk/graph/src/main/java/org/apache/hama/graph/VerticesInfo.java Mon Apr 13 01:00:06 2015
@@ -18,6 +18,7 @@
 package org.apache.hama.graph;
 
 import java.io.IOException;
+import java.util.Collection;
 import java.util.Iterator;
 import java.util.Set;
 
@@ -79,6 +80,8 @@ public interface VerticesInfo<V extends
    */
   public Set<V> keySet();
   
+  public Collection<Vertex<V, E, M>> getValues();
+
   /**
    * Finish the additions, from this point on the implementations should close
    * the adds and throw exceptions in case something is added after this call.
@@ -101,4 +104,5 @@ public interface VerticesInfo<V extends
    */
   public void finishSuperstep() throws IOException;
 
+
 }

Modified: hama/trunk/graph/src/test/java/org/apache/hama/graph/TestSubmitGraphJob.java
URL: http://svn.apache.org/viewvc/hama/trunk/graph/src/test/java/org/apache/hama/graph/TestSubmitGraphJob.java?rev=1673079&r1=1673078&r2=1673079&view=diff
==============================================================================
--- hama/trunk/graph/src/test/java/org/apache/hama/graph/TestSubmitGraphJob.java (original)
+++ hama/trunk/graph/src/test/java/org/apache/hama/graph/TestSubmitGraphJob.java Mon Apr 13 01:00:06 2015
@@ -62,7 +62,6 @@ public class TestSubmitGraphJob extends
   public void setUp() throws Exception {
     super.setUp();
     vi.add(MapVerticesInfo.class);
-    vi.add(OffHeapVerticesInfo.class);
   }
 
   @Override
@@ -119,7 +118,7 @@ public class TestSubmitGraphJob extends
   @SuppressWarnings("rawtypes")
   protected void injectVerticesInfo() {
     Class<? extends VerticesInfo> verticesInfoClass = vi.get(Math
-        .abs(new Random().nextInt() % 2));
+        .abs(new Random().nextInt() % 1));
     LOG.info("using vertices info of type : " + verticesInfoClass.getName());
   }
 

Modified: hama/trunk/pom.xml
URL: http://svn.apache.org/viewvc/hama/trunk/pom.xml?rev=1673079&r1=1673078&r2=1673079&view=diff
==============================================================================
--- hama/trunk/pom.xml (original)
+++ hama/trunk/pom.xml Mon Apr 13 01:00:06 2015
@@ -126,24 +126,6 @@
 
   <profiles>
     <profile>
-      <id>doclint-java8-disable</id>
-      <activation>
-        <jdk>[1.8,)</jdk>
-      </activation>
-      <build>
-        <plugins>
-          <plugin>
-            <groupId>org.apache.maven.plugins</groupId>
-            <artifactId>maven-javadoc-plugin</artifactId>
-            <configuration>
-              <additionalparam>-Xdoclint:none</additionalparam>
-            </configuration>
-          </plugin>
-        </plugins>
-      </build>
-    </profile>
-
-    <profile>
       <id>hadoop1</id>
       
       <modules>
@@ -493,6 +475,7 @@
         <version>2.9.1</version>
         <configuration>
           <aggregate>true</aggregate>
+          <failOnError>false</failOnError>
           <outputDirectory>docs/apidocs</outputDirectory>
         </configuration>
         <executions>