You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hama.apache.org by tj...@apache.org on 2012/05/19 10:32:29 UTC

svn commit: r1340371 - in /incubator/hama/trunk: ./ core/src/main/java/org/apache/hama/bsp/message/ core/src/main/java/org/apache/hama/bsp/message/compress/ core/src/test/java/org/apache/hama/ core/src/test/java/org/apache/hama/bsp/ core/src/test/java/...

Author: tjungblut
Date: Sat May 19 08:32:28 2012
New Revision: 1340371

URL: http://svn.apache.org/viewvc?rev=1340371&view=rev
Log:
[HAMA-566]: Add a disk-based queue

Added:
    incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/message/SingleLockQueue.java
Modified:
    incubator/hama/trunk/CHANGES.txt
    incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/message/AbstractMessageManager.java
    incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/message/AvroBSPMessageBundle.java
    incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/message/AvroMessageManagerImpl.java
    incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/message/DiskQueue.java
    incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/message/HadoopMessageManagerImpl.java
    incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/message/SynchronizedQueue.java
    incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/message/compress/Bzip2Compressor.java
    incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/message/compress/SnappyCompressor.java
    incubator/hama/trunk/core/src/test/java/org/apache/hama/MiniZooKeeperCluster.java
    incubator/hama/trunk/core/src/test/java/org/apache/hama/bsp/TestBSPMasterGroomServer.java
    incubator/hama/trunk/core/src/test/java/org/apache/hama/bsp/TestBSPTaskFaults.java
    incubator/hama/trunk/core/src/test/java/org/apache/hama/bsp/TestCheckpoint.java
    incubator/hama/trunk/core/src/test/java/org/apache/hama/bsp/TestClusterStatus.java
    incubator/hama/trunk/core/src/test/java/org/apache/hama/bsp/TestZooKeeper.java
    incubator/hama/trunk/core/src/test/java/org/apache/hama/bsp/message/TestAvroMessageManager.java
    incubator/hama/trunk/core/src/test/java/org/apache/hama/bsp/message/TestHadoopMessageManager.java

Modified: incubator/hama/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/incubator/hama/trunk/CHANGES.txt?rev=1340371&r1=1340370&r2=1340371&view=diff
==============================================================================
--- incubator/hama/trunk/CHANGES.txt (original)
+++ incubator/hama/trunk/CHANGES.txt Sat May 19 08:32:28 2012
@@ -4,6 +4,7 @@ Release 0.5 - April 10, 2012 
 
   NEW FEATURES
 
+   HAMA-566: Add disk-based queue (tjungblut)
    HAMA-552: Add a sorted message queue (tjungblut)   
    HAMA-556: Graph package to support stopping the interations when the node changes are within the tolerance value as in the case of page rank (tjungblut)
    HAMA-508: Add clean plugin (Mikalai Parafeniuk via edwardyoon)

Modified: incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/message/AbstractMessageManager.java
URL: http://svn.apache.org/viewvc/incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/message/AbstractMessageManager.java?rev=1340371&r1=1340370&r2=1340371&view=diff
==============================================================================
--- incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/message/AbstractMessageManager.java (original)
+++ incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/message/AbstractMessageManager.java Sat May 19 08:32:28 2012
@@ -28,6 +28,7 @@ import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.conf.Configurable;
 import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.io.Writable;
 import org.apache.hadoop.util.ReflectionUtils;
 import org.apache.hama.bsp.BSPPeer;
@@ -35,10 +36,15 @@ import org.apache.hama.bsp.BSPPeerImpl;
 import org.apache.hama.bsp.TaskAttemptID;
 import org.apache.hama.util.BSPNetUtils;
 
+/**
+ * Abstract baseclass that should contain all information and services needed
+ * for the concrete RPC subclasses. For example it manages how the queues are
+ * managed and it maintains a cache for socket addresses.
+ */
 public abstract class AbstractMessageManager<M extends Writable> implements
     MessageManager<M>, Configurable {
 
-  private static final Log LOG = LogFactory
+  protected static final Log LOG = LogFactory
       .getLog(AbstractMessageManager.class);
 
   // conf is injected via reflection of the factory
@@ -48,12 +54,19 @@ public abstract class AbstractMessageMan
   protected MessageQueue<M> localQueue;
   // this must be a synchronized implementation: this is accessed per RPC
   protected SynchronizedQueue<M> localQueueForNextIteration;
+  // this peer object is just used for counter incrementation
   protected BSPPeer<?, ?, ?, ?, M> peer;
   // the peer address of this peer
   protected InetSocketAddress peerAddress;
   // the task attempt id
   protected TaskAttemptID attemptId;
 
+  /*
+   * (non-Javadoc)
+   * @see org.apache.hama.bsp.message.MessageManager#init(org.apache.hama.bsp.
+   * TaskAttemptID, org.apache.hama.bsp.BSPPeer,
+   * org.apache.hadoop.conf.Configuration, java.net.InetSocketAddress)
+   */
   @Override
   public void init(TaskAttemptID attemptId, BSPPeer<?, ?, ?, ?, M> peer,
       Configuration conf, InetSocketAddress peerAddress) {
@@ -62,11 +75,13 @@ public abstract class AbstractMessageMan
     this.conf = conf;
     this.peerAddress = peerAddress;
     localQueue = getQueue();
-    localQueue.init(conf, attemptId);
     localQueueForNextIteration = getSynchronizedQueue();
-    localQueueForNextIteration.init(conf, attemptId);
   }
 
+  /*
+   * (non-Javadoc)
+   * @see org.apache.hama.bsp.message.MessageManager#close()
+   */
   @Override
   public void close() {
     Collection<MessageQueue<M>> values = outgoingQueues.values();
@@ -74,8 +89,21 @@ public abstract class AbstractMessageMan
       msgQueue.close();
     }
     localQueue.close();
+    // remove possible disk queues from the path
+    try {
+      FileSystem.get(conf).delete(
+          DiskQueue.getQueueDir(conf, attemptId,
+              conf.get(DiskQueue.DISK_QUEUE_PATH_KEY)), true);
+    } catch (IOException e) {
+      LOG.warn("Queue dir couldn't be deleted");
+    }
+
   }
 
+  /*
+   * (non-Javadoc)
+   * @see org.apache.hama.bsp.message.MessageManager#finishSendPhase()
+   */
   @Override
   public void finishSendPhase() throws IOException {
     Collection<MessageQueue<M>> values = outgoingQueues.values();
@@ -84,29 +112,42 @@ public abstract class AbstractMessageMan
     }
   }
 
+  /*
+   * (non-Javadoc)
+   * @see org.apache.hama.bsp.message.MessageManager#getCurrentMessage()
+   */
   @Override
   public final M getCurrentMessage() throws IOException {
     return localQueue.poll();
   }
 
+  /*
+   * (non-Javadoc)
+   * @see org.apache.hama.bsp.message.MessageManager#getNumCurrentMessages()
+   */
   @Override
   public final int getNumCurrentMessages() {
     return localQueue.size();
   }
 
+  /*
+   * (non-Javadoc)
+   * @see org.apache.hama.bsp.message.MessageManager#clearOutgoingQueues()
+   */
   @Override
   public final void clearOutgoingQueues() {
-    this.outgoingQueues.clear();
-    localQueueForNextIteration.prepareRead();
-    localQueue.prepareWrite();
-    localQueue.addAll(localQueueForNextIteration.getMessageQueue());
+    localQueue = localQueueForNextIteration.getMessageQueue();
     localQueue.prepareRead();
-    localQueueForNextIteration.clear();
+    localQueueForNextIteration = getSynchronizedQueue();
   }
 
+  /*
+   * (non-Javadoc)
+   * @see org.apache.hama.bsp.message.MessageManager#send(java.lang.String,
+   * org.apache.hadoop.io.Writable)
+   */
   @Override
   public void send(String peerName, M msg) throws IOException {
-    LOG.debug("Send message (" + msg.toString() + ") to " + peerName);
     InetSocketAddress targetPeerAddress = null;
     // Get socket for target peer.
     if (peerSocketCache.containsKey(peerName)) {
@@ -124,6 +165,10 @@ public abstract class AbstractMessageMan
     outgoingQueues.put(targetPeerAddress, queue);
   }
 
+  /*
+   * (non-Javadoc)
+   * @see org.apache.hama.bsp.message.MessageManager#getMessageIterator()
+   */
   @Override
   public final Iterator<Entry<InetSocketAddress, MessageQueue<M>>> getMessageIterator() {
     return this.outgoingQueues.entrySet().iterator();
@@ -132,12 +177,14 @@ public abstract class AbstractMessageMan
   /**
    * Returns a new queue implementation based on what was configured. If nothing
    * has been configured for "hama.messenger.queue.class" then the
-   * {@link MemoryQueue} is used.
+   * {@link MemoryQueue} is used. If you have scalability issues, then better
+   * use {@link DiskQueue}.
    * 
    * @return a <b>new</b> queue implementation.
    */
   protected MessageQueue<M> getQueue() {
     Class<?> queueClass = conf.getClass(QUEUE_TYPE_CLASS, MemoryQueue.class);
+    LOG.debug("Creating new " + queueClass);
     @SuppressWarnings("unchecked")
     MessageQueue<M> newInstance = (MessageQueue<M>) ReflectionUtils
         .newInstance(queueClass, conf);
@@ -146,13 +193,15 @@ public abstract class AbstractMessageMan
   }
 
   protected SynchronizedQueue<M> getSynchronizedQueue() {
-    return SynchronizedQueue.synchronize(getQueue());
+    return SingleLockQueue.synchronize(getQueue());
   }
 
+  @Override
   public final Configuration getConf() {
     return conf;
   }
 
+  @Override
   public final void setConf(Configuration conf) {
     this.conf = conf;
   }

Modified: incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/message/AvroBSPMessageBundle.java
URL: http://svn.apache.org/viewvc/incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/message/AvroBSPMessageBundle.java?rev=1340371&r1=1340370&r2=1340371&view=diff
==============================================================================
--- incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/message/AvroBSPMessageBundle.java (original)
+++ incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/message/AvroBSPMessageBundle.java Sat May 19 08:32:28 2012
@@ -31,11 +31,13 @@ public final class AvroBSPMessageBundle<
   @Deprecated
   public java.nio.ByteBuffer data;
 
+  @Override
   public final org.apache.avro.Schema getSchema() {
     return SCHEMA$;
   }
 
   // Used by DatumWriter. Applications should not call.
+  @Override
   public final java.lang.Object get(int field$) {
     switch (field$) {
       case 0:
@@ -46,6 +48,7 @@ public final class AvroBSPMessageBundle<
   }
 
   // Used by DatumReader. Applications should not call.
+  @Override
   public final void put(int field$, java.lang.Object value$) {
     switch (field$) {
       case 0:
@@ -116,12 +119,12 @@ public final class AvroBSPMessageBundle<
     private Builder(AvroBSPMessageBundle<?> other) {
       super(AvroBSPMessageBundle.SCHEMA$);
       if (isValidValue(fields[0], other.data)) {
-        data = (java.nio.ByteBuffer) clone(other.data);
+        data = clone(other.data);
         fieldSetFlags[0] = true;
       }
     }
 
-    public final ByteBuffer clone(ByteBuffer original) {
+    public final static ByteBuffer clone(ByteBuffer original) {
       ByteBuffer clone = ByteBuffer.allocate(original.capacity());
       original.rewind();
       clone.put(original);

Modified: incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/message/AvroMessageManagerImpl.java
URL: http://svn.apache.org/viewvc/incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/message/AvroMessageManagerImpl.java?rev=1340371&r1=1340370&r2=1340371&view=diff
==============================================================================
--- incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/message/AvroMessageManagerImpl.java (original)
+++ incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/message/AvroMessageManagerImpl.java Sat May 19 08:32:28 2012
@@ -81,7 +81,7 @@ public final class AvroMessageManagerImp
 
     if (sender == null) {
       NettyTransceiver client = new NettyTransceiver(addr);
-      sender = (Sender<M>) SpecificRequestor.getClient(Sender.class, client);
+      sender = SpecificRequestor.getClient(Sender.class, client);
       peers.put(addr, sender);
     }
 

Modified: incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/message/DiskQueue.java
URL: http://svn.apache.org/viewvc/incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/message/DiskQueue.java?rev=1340371&r1=1340370&r2=1340371&view=diff
==============================================================================
--- incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/message/DiskQueue.java (original)
+++ incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/message/DiskQueue.java Sat May 19 08:32:28 2012
@@ -24,41 +24,45 @@ import java.util.Iterator;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FSDataInputStream;
+import org.apache.hadoop.fs.FSDataOutputStream;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.io.NullWritable;
 import org.apache.hadoop.io.ObjectWritable;
-import org.apache.hadoop.io.SequenceFile;
 import org.apache.hadoop.io.Writable;
 import org.apache.hama.bsp.TaskAttemptID;
 
 /**
- * A disk based queue that is backed by a sequencefile. <br/>
+ * A disk based queue that is backed by a raw file on local disk. <br/>
  * Structure is as follows: <br/>
  * If "bsp.disk.queue.dir" is not defined, "hama.tmp.dir" will be used instead. <br/>
  * ${hama.tmp.dir}/diskqueue/job_id/task_attempt_id/ <br/>
  * An ongoing sequencenumber will be appended to prevent inner collisions,
  * however the job_id dir will never be deleted. So you need a cronjob to do the
  * cleanup for you. <br/>
- * <b>This is currently not intended to be production ready</b>
+ * It is recommended to use the file:// scheme in front of the property, because
+ * writes on DFS are expensive, however your local disk may not have enough
+ * space for your message, so you can easily switch per job via your
+ * configuration. <br/>
+ * <b>It is experimental to use.</b>
  */
 public final class DiskQueue<M extends Writable> implements MessageQueue<M> {
 
   public static final String DISK_QUEUE_PATH_KEY = "bsp.disk.queue.dir";
 
-  private final Log LOG = LogFactory.getLog(DiskQueue.class);
+  private static final int MAX_RETRIES = 4;
+  private static final Log LOG = LogFactory.getLog(DiskQueue.class);
 
   private static volatile int ONGOING_SEQUENCE_NUMBER = 0;
-  private static final int MAX_RETRIES = 4;
-  private static final NullWritable NULL_WRITABLE = NullWritable.get();
 
   private int size = 0;
   // injected via reflection
   private Configuration conf;
   private FileSystem fs;
 
-  private SequenceFile.Writer writer;
-  private SequenceFile.Reader reader;
+  private FSDataOutputStream writer;
+  private FSDataInputStream reader;
+
   private Path queuePath;
   private TaskAttemptID id;
   private final ObjectWritable writable = new ObjectWritable();
@@ -71,17 +75,7 @@ public final class DiskQueue<M extends W
       fs = FileSystem.get(conf);
       String configuredQueueDir = conf.get(DISK_QUEUE_PATH_KEY);
       Path queueDir = null;
-      if (configuredQueueDir == null) {
-        String hamaTmpDir = conf.get("hama.tmp.dir");
-        if (hamaTmpDir != null) {
-          queueDir = createDiskQueuePath(id, hamaTmpDir);
-        } else {
-          // use some local tmp dir
-          queueDir = createDiskQueuePath(id, "/tmp/messageStorage/");
-        }
-      } else {
-        queueDir = createDiskQueuePath(id, configuredQueueDir);
-      }
+      queueDir = getQueueDir(conf, id, configuredQueueDir);
       fs.mkdirs(queueDir);
       queuePath = new Path(queueDir, (ONGOING_SEQUENCE_NUMBER++)
           + "_messages.seq");
@@ -109,6 +103,7 @@ public final class DiskQueue<M extends W
   private void closeInternal(boolean delete) {
     try {
       if (writer != null) {
+        writer.flush();
         writer.close();
         writer = null;
       }
@@ -124,6 +119,7 @@ public final class DiskQueue<M extends W
       }
       if (writer != null) {
         try {
+          writer.flush();
           writer.close();
           writer = null;
         } catch (IOException e) {
@@ -155,7 +151,7 @@ public final class DiskQueue<M extends W
     // make sure we've closed
     closeInternal(false);
     try {
-      reader = new SequenceFile.Reader(fs, queuePath, conf);
+      reader = fs.open(queuePath);
     } catch (IOException e) {
       // can't recover from that
       LOG.error(e);
@@ -166,8 +162,7 @@ public final class DiskQueue<M extends W
   @Override
   public void prepareWrite() {
     try {
-      writer = new SequenceFile.Writer(fs, conf, queuePath,
-          ObjectWritable.class, NullWritable.class);
+      writer = fs.create(queuePath);
     } catch (IOException e) {
       // can't recover from that
       LOG.error(e);
@@ -194,7 +189,7 @@ public final class DiskQueue<M extends W
   public final void add(M item) {
     size++;
     try {
-      writer.append(new ObjectWritable(item), NULL_WRITABLE);
+      new ObjectWritable(item).write(writer);
     } catch (IOException e) {
       LOG.error(e);
     }
@@ -210,16 +205,19 @@ public final class DiskQueue<M extends W
   @SuppressWarnings("unchecked")
   @Override
   public final M poll() {
+    if (size == 0) {
+      return null;
+    }
     size--;
     int tries = 1;
     while (tries <= MAX_RETRIES) {
       try {
-        boolean next = reader.next(writable, NULL_WRITABLE);
-        if (next) {
+        writable.readFields(reader);
+        if (size > 0) {
           return (M) writable.get();
         } else {
-          closeInternal(false);
-          return null;
+          closeInternal(true);
+          return (M) writable.get();
         }
       } catch (IOException e) {
         LOG.error("Retrying for the " + tries + "th time!", e);
@@ -277,11 +275,32 @@ public final class DiskQueue<M extends W
   }
 
   /**
+   * Creates a path for a queue
+   */
+  public static Path getQueueDir(Configuration conf, TaskAttemptID id,
+      String configuredQueueDir) {
+    Path queueDir;
+    if (configuredQueueDir == null) {
+      String hamaTmpDir = conf.get("hama.tmp.dir");
+      if (hamaTmpDir != null) {
+        queueDir = createDiskQueuePath(id, hamaTmpDir);
+      } else {
+        // use some local tmp dir
+        queueDir = createDiskQueuePath(id, "/tmp/messageStorage/");
+      }
+    } else {
+      queueDir = createDiskQueuePath(id, configuredQueueDir);
+    }
+    return queueDir;
+  }
+
+  /**
    * Creates a generic Path based on the configured path and the task attempt id
    * to store disk sequence files. <br/>
    * Structure is as follows: ${hama.tmp.dir}/diskqueue/job_id/task_attempt_id/
    */
-  public static Path createDiskQueuePath(TaskAttemptID id, String configuredPath) {
+  private static Path createDiskQueuePath(TaskAttemptID id,
+      String configuredPath) {
     return new Path(new Path(new Path(configuredPath, "diskqueue"), id
         .getJobID().toString()), id.getTaskID().toString());
   }

Modified: incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/message/HadoopMessageManagerImpl.java
URL: http://svn.apache.org/viewvc/incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/message/HadoopMessageManagerImpl.java?rev=1340371&r1=1340370&r2=1340371&view=diff
==============================================================================
--- incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/message/HadoopMessageManagerImpl.java (original)
+++ incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/message/HadoopMessageManagerImpl.java Sat May 19 08:32:28 2012
@@ -32,6 +32,7 @@ import org.apache.hama.bsp.BSPPeer;
 import org.apache.hama.bsp.BSPPeerImpl;
 import org.apache.hama.bsp.TaskAttemptID;
 import org.apache.hama.bsp.message.compress.BSPCompressedBundle;
+import org.apache.hama.ipc.HamaRPCProtocolVersion;
 import org.apache.hama.util.CompressionUtil;
 
 /**
@@ -107,7 +108,7 @@ public final class HadoopMessageManagerI
     HadoopMessageManager<M> peer = peers.get(addr);
     if (peer == null) {
       peer = (HadoopMessageManager<M>) RPC.getProxy(HadoopMessageManager.class,
-          HadoopMessageManager.versionID, addr, this.conf);
+          HamaRPCProtocolVersion.versionID, addr, this.conf);
       this.peers.put(addr, peer);
     }
     return peer;

Added: incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/message/SingleLockQueue.java
URL: http://svn.apache.org/viewvc/incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/message/SingleLockQueue.java?rev=1340371&view=auto
==============================================================================
--- incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/message/SingleLockQueue.java (added)
+++ incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/message/SingleLockQueue.java Sat May 19 08:32:28 2012
@@ -0,0 +1,193 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hama.bsp.message;
+
+import java.util.Collection;
+import java.util.Iterator;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hama.bsp.TaskAttemptID;
+
+/**
+ * A global mutex based synchronized queue.
+ */
+public final class SingleLockQueue<T> implements SynchronizedQueue<T> {
+
+  private final MessageQueue<T> queue;
+  private final Object mutex;
+
+  private SingleLockQueue(MessageQueue<T> queue) {
+    this.queue = queue;
+    this.mutex = new Object();
+  }
+
+  private SingleLockQueue(MessageQueue<T> queue, Object mutex) {
+    this.queue = queue;
+    this.mutex = mutex;
+  }
+
+  /*
+   * (non-Javadoc)
+   * @see org.apache.hama.bsp.message.SynchronizedQueue#iterator()
+   */
+  @Override
+  public Iterator<T> iterator() {
+    synchronized (mutex) {
+      return queue.iterator();
+    }
+  }
+
+  /*
+   * (non-Javadoc)
+   * @see
+   * org.apache.hama.bsp.message.SynchronizedQueue#setConf(org.apache.hadoop
+   * .conf.Configuration)
+   */
+  @Override
+  public void setConf(Configuration conf) {
+    synchronized (mutex) {
+      queue.setConf(conf);
+    }
+  }
+
+  /*
+   * (non-Javadoc)
+   * @see org.apache.hama.bsp.message.SynchronizedQueue#getConf()
+   */
+  @Override
+  public Configuration getConf() {
+    synchronized (mutex) {
+      return queue.getConf();
+    }
+  }
+
+  /*
+   * (non-Javadoc)
+   * @see
+   * org.apache.hama.bsp.message.SynchronizedQueue#init(org.apache.hadoop.conf
+   * .Configuration, org.apache.hama.bsp.TaskAttemptID)
+   */
+  @Override
+  public void init(Configuration conf, TaskAttemptID id) {
+    synchronized (mutex) {
+      queue.init(conf, id);
+    }
+  }
+
+  /*
+   * (non-Javadoc)
+   * @see org.apache.hama.bsp.message.SynchronizedQueue#close()
+   */
+  @Override
+  public void close() {
+    synchronized (mutex) {
+    }
+    queue.close();
+  }
+
+  /*
+   * (non-Javadoc)
+   * @see org.apache.hama.bsp.message.SynchronizedQueue#prepareRead()
+   */
+  @Override
+  public void prepareRead() {
+    synchronized (mutex) {
+      queue.prepareRead();
+    }
+  }
+
+  /*
+   * (non-Javadoc)
+   * @see
+   * org.apache.hama.bsp.message.SynchronizedQueue#addAll(java.util.Collection)
+   */
+  @Override
+  public void addAll(Collection<T> col) {
+    synchronized (mutex) {
+      queue.addAll(col);
+    }
+  }
+
+  /*
+   * (non-Javadoc)
+   * @see org.apache.hama.bsp.message.SynchronizedQueue#add(T)
+   */
+  @Override
+  public void add(T item) {
+    synchronized (mutex) {
+      queue.add(item);
+    }
+  }
+
+  /*
+   * (non-Javadoc)
+   * @see org.apache.hama.bsp.message.SynchronizedQueue#clear()
+   */
+  @Override
+  public void clear() {
+    synchronized (mutex) {
+      queue.clear();
+    }
+  }
+
+  /*
+   * (non-Javadoc)
+   * @see org.apache.hama.bsp.message.SynchronizedQueue#poll()
+   */
+  @Override
+  public Object poll() {
+    synchronized (mutex) {
+      return queue.poll();
+    }
+  }
+
+  /*
+   * (non-Javadoc)
+   * @see org.apache.hama.bsp.message.SynchronizedQueue#size()
+   */
+  @Override
+  public int size() {
+    synchronized (mutex) {
+      return queue.size();
+    }
+  }
+
+  /*
+   * (non-Javadoc)
+   * @see org.apache.hama.bsp.message.SynchronizedQueue#getMessageQueue()
+   */
+  @Override
+  public MessageQueue<T> getMessageQueue() {
+    synchronized (mutex) {
+      return queue;
+    }
+  }
+
+  /*
+   * static constructor methods to be type safe
+   */
+
+  public static <T> SynchronizedQueue<T> synchronize(MessageQueue<T> queue) {
+    return new SingleLockQueue<T>(queue);
+  }
+
+  public static <T> SynchronizedQueue<T> synchronize(MessageQueue<T> queue,
+      Object mutex) {
+    return new SingleLockQueue<T>(queue, mutex);
+  }
+}

Modified: incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/message/SynchronizedQueue.java
URL: http://svn.apache.org/viewvc/incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/message/SynchronizedQueue.java?rev=1340371&r1=1340370&r2=1340371&view=diff
==============================================================================
--- incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/message/SynchronizedQueue.java (original)
+++ incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/message/SynchronizedQueue.java Sat May 19 08:32:28 2012
@@ -20,110 +20,34 @@ package org.apache.hama.bsp.message;
 import java.util.Collection;
 import java.util.Iterator;
 
+import org.apache.hadoop.conf.Configurable;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hama.bsp.TaskAttemptID;
 
 /**
- * A global mutex based synchronized queue.
+ * Synchronized Queue interface. Can be used to implement better synchronized
+ * datastructures.
  */
-public final class SynchronizedQueue<T> {
+public interface SynchronizedQueue<T> extends Configurable {
 
-  private final MessageQueue<T> queue;
-  private final Object mutex;
+  public abstract Iterator<T> iterator();
 
-  private SynchronizedQueue(MessageQueue<T> queue) {
-    this.queue = queue;
-    this.mutex = new Object();
-  }
-
-  private SynchronizedQueue(MessageQueue<T> queue, Object mutex) {
-    this.queue = queue;
-    this.mutex = mutex;
-  }
-
-  public Iterator<T> iterator() {
-    synchronized (mutex) {
-      return queue.iterator();
-    }
-  }
-
-  public void setConf(Configuration conf) {
-    synchronized (mutex) {
-      queue.setConf(conf);
-    }
-  }
-
-  public Configuration getConf() {
-    synchronized (mutex) {
-      return queue.getConf();
-    }
-  }
-
-  public void init(Configuration conf, TaskAttemptID id) {
-    synchronized (mutex) {
-      queue.init(conf, id);
-    }
-  }
-
-  public void close() {
-    synchronized (mutex) {
-    }
-    queue.close();
-  }
-
-  public void prepareRead() {
-    synchronized (mutex) {
-      queue.prepareRead();
-    }
-  }
-
-  public void addAll(Collection<T> col) {
-    synchronized (mutex) {
-      queue.addAll(col);
-    }
-  }
-
-  public void add(T item) {
-    synchronized (mutex) {
-      queue.add(item);
-    }
-  }
-
-  public void clear() {
-    synchronized (mutex) {
-      queue.clear();
-    }
-  }
-
-  public Object poll() {
-    synchronized (mutex) {
-      return queue.poll();
-    }
-  }
-
-  public int size() {
-    synchronized (mutex) {
-      return queue.size();
-    }
-  }
-
-  public MessageQueue<T> getMessageQueue() {
-    synchronized (mutex) {
-      return queue;
-    }
-  }
-
-  /*
-   * static constructor methods to be type safe
-   */
-
-  public static <T> SynchronizedQueue<T> synchronize(MessageQueue<T> queue) {
-    return new SynchronizedQueue<T>(queue);
-  }
-
-  public static <T> SynchronizedQueue<T> synchronize(MessageQueue<T> queue,
-      Object mutex) {
-    return new SynchronizedQueue<T>(queue, mutex);
-  }
+  public abstract void init(Configuration conf, TaskAttemptID id);
+
+  public abstract void close();
+
+  public abstract void prepareRead();
+
+  public abstract void addAll(Collection<T> col);
+
+  public abstract void add(T item);
+
+  public abstract void clear();
+
+  public abstract Object poll();
+
+  public abstract int size();
+
+  public abstract MessageQueue<T> getMessageQueue();
 
 }

Modified: incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/message/compress/Bzip2Compressor.java
URL: http://svn.apache.org/viewvc/incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/message/compress/Bzip2Compressor.java?rev=1340371&r1=1340370&r2=1340371&view=diff
==============================================================================
--- incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/message/compress/Bzip2Compressor.java (original)
+++ incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/message/compress/Bzip2Compressor.java Sat May 19 08:32:28 2012
@@ -34,6 +34,7 @@ public class Bzip2Compressor<M extends W
 
   private final BZip2Codec codec = new BZip2Codec();
 
+  @Override
   public BSPCompressedBundle compressBundle(BSPMessageBundle<M> bundle) {
     BSPCompressedBundle compMsgBundle = null;
     ByteArrayOutputStream bos = null;
@@ -71,6 +72,7 @@ public class Bzip2Compressor<M extends W
    * @param compMsgBundle
    * @return
    */
+  @Override
   public BSPMessageBundle<M> decompressBundle(BSPCompressedBundle compMsgBundle) {
     ByteArrayInputStream bis = null;
     CompressionInputStream sis = null;

Modified: incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/message/compress/SnappyCompressor.java
URL: http://svn.apache.org/viewvc/incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/message/compress/SnappyCompressor.java?rev=1340371&r1=1340370&r2=1340371&view=diff
==============================================================================
--- incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/message/compress/SnappyCompressor.java (original)
+++ incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/message/compress/SnappyCompressor.java Sat May 19 08:32:28 2012
@@ -31,6 +31,7 @@ import org.xerial.snappy.SnappyOutputStr
 public class SnappyCompressor<M extends Writable> implements
     BSPMessageCompressor<M> {
 
+  @Override
   public BSPCompressedBundle compressBundle(BSPMessageBundle<M> bundle) {
     BSPCompressedBundle compMsgBundle = null;
     ByteArrayOutputStream bos = null;
@@ -68,6 +69,7 @@ public class SnappyCompressor<M extends 
    * @param compMsgBundle
    * @return
    */
+  @Override
   public BSPMessageBundle<M> decompressBundle(BSPCompressedBundle compMsgBundle) {
     ByteArrayInputStream bis = null;
     SnappyInputStream sis = null;

Modified: incubator/hama/trunk/core/src/test/java/org/apache/hama/MiniZooKeeperCluster.java
URL: http://svn.apache.org/viewvc/incubator/hama/trunk/core/src/test/java/org/apache/hama/MiniZooKeeperCluster.java?rev=1340371&r1=1340370&r2=1340371&view=diff
==============================================================================
--- incubator/hama/trunk/core/src/test/java/org/apache/hama/MiniZooKeeperCluster.java (original)
+++ incubator/hama/trunk/core/src/test/java/org/apache/hama/MiniZooKeeperCluster.java Sat May 19 08:32:28 2012
@@ -117,7 +117,7 @@ public class MiniZooKeeperCluster {
     return clientPort;
   }
 
-  private void recreateDir(File dir) throws IOException {
+  private static void recreateDir(File dir) throws IOException {
     if (dir.exists()) {
       FileUtil.fullyDelete(dir);
     }

Modified: incubator/hama/trunk/core/src/test/java/org/apache/hama/bsp/TestBSPMasterGroomServer.java
URL: http://svn.apache.org/viewvc/incubator/hama/trunk/core/src/test/java/org/apache/hama/bsp/TestBSPMasterGroomServer.java?rev=1340371&r1=1340370&r2=1340371&view=diff
==============================================================================
--- incubator/hama/trunk/core/src/test/java/org/apache/hama/bsp/TestBSPMasterGroomServer.java (original)
+++ incubator/hama/trunk/core/src/test/java/org/apache/hama/bsp/TestBSPMasterGroomServer.java Sat May 19 08:32:28 2012
@@ -132,62 +132,31 @@ public class TestBSPMasterGroomServer ex
    */
 
   /*
-   * BEGIN: ZooKeeper tests.
-
-  public void testClearZKNodes() throws IOException, KeeperException,
-      InterruptedException {
-
-    // Clear any existing znode with the same path as bspRoot.
-    bspCluster.getBSPMaster().clearZKNodes();
-
-    int timeout = configuration.getInt(Constants.ZOOKEEPER_SESSION_TIMEOUT,
-        6000);
-    String connectStr = QuorumPeer.getZKQuorumServersString(configuration);
-    String bspRoot = configuration.get(Constants.ZOOKEEPER_ROOT,
-        Constants.DEFAULT_ZOOKEEPER_ROOT);
-
-    // Establishing a zk session.
-    ZooKeeper zk = new ZooKeeper(connectStr, timeout, new Watcher() {
-      @Override
-      public void process(WatchedEvent event) {
-        // Do nothing.(Dummy Watcher)
-      }
-    });
-
-    // Creating dummy bspRoot if it doesn't already exist.
-    Stat s = zk.exists(bspRoot, false);
-    if (s == null) {
-      zk.create(bspRoot, new byte[0], Ids.OPEN_ACL_UNSAFE,
-          CreateMode.PERSISTENT);
-    }
-
-    // Creating dummy child nodes at depth 1.
-    String node1 = bspRoot + "/task1";
-    String node2 = bspRoot + "/task2";
-    zk.create(node1, new byte[0], Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
-    zk.create(node2, new byte[0], Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
-
-    // Creating dummy child node at depth 2.
-    String node11 = node1 + "/superstep1";
-    zk.create(node11, new byte[0], Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
-
-    ArrayList<String> list = (ArrayList<String>) zk.getChildren(bspRoot, false);
-    assertEquals(2, list.size());
-    System.out.println(list.size());
-
-    bspCluster.getBSPMaster().clearZKNodes();
-
-    list = (ArrayList<String>) zk.getChildren(bspRoot, false);
-    System.out.println(list.size());
-    assertEquals(0, list.size());
-
-    try {
-      zk.getData(node11, false, null);
-      fail();
-    } catch (KeeperException.NoNodeException e) {
-      System.out.println("Node has been removed correctly!");
-    }
-  }
+   * BEGIN: ZooKeeper tests. public void testClearZKNodes() throws IOException,
+   * KeeperException, InterruptedException { // Clear any existing znode with
+   * the same path as bspRoot. bspCluster.getBSPMaster().clearZKNodes(); int
+   * timeout = configuration.getInt(Constants.ZOOKEEPER_SESSION_TIMEOUT, 6000);
+   * String connectStr = QuorumPeer.getZKQuorumServersString(configuration);
+   * String bspRoot = configuration.get(Constants.ZOOKEEPER_ROOT,
+   * Constants.DEFAULT_ZOOKEEPER_ROOT); // Establishing a zk session. ZooKeeper
+   * zk = new ZooKeeper(connectStr, timeout, new Watcher() {
+   * @Override public void process(WatchedEvent event) { // Do nothing.(Dummy
+   * Watcher) } }); // Creating dummy bspRoot if it doesn't already exist. Stat
+   * s = zk.exists(bspRoot, false); if (s == null) { zk.create(bspRoot, new
+   * byte[0], Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT); } // Creating dummy
+   * child nodes at depth 1. String node1 = bspRoot + "/task1"; String node2 =
+   * bspRoot + "/task2"; zk.create(node1, new byte[0], Ids.OPEN_ACL_UNSAFE,
+   * CreateMode.PERSISTENT); zk.create(node2, new byte[0], Ids.OPEN_ACL_UNSAFE,
+   * CreateMode.PERSISTENT); // Creating dummy child node at depth 2. String
+   * node11 = node1 + "/superstep1"; zk.create(node11, new byte[0],
+   * Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT); ArrayList<String> list =
+   * (ArrayList<String>) zk.getChildren(bspRoot, false); assertEquals(2,
+   * list.size()); System.out.println(list.size());
+   * bspCluster.getBSPMaster().clearZKNodes(); list = (ArrayList<String>)
+   * zk.getChildren(bspRoot, false); System.out.println(list.size());
+   * assertEquals(0, list.size()); try { zk.getData(node11, false, null);
+   * fail(); } catch (KeeperException.NoNodeException e) {
+   * System.out.println("Node has been removed correctly!"); } }
    */
 
   /*

Modified: incubator/hama/trunk/core/src/test/java/org/apache/hama/bsp/TestBSPTaskFaults.java
URL: http://svn.apache.org/viewvc/incubator/hama/trunk/core/src/test/java/org/apache/hama/bsp/TestBSPTaskFaults.java?rev=1340371&r1=1340370&r2=1340371&view=diff
==============================================================================
--- incubator/hama/trunk/core/src/test/java/org/apache/hama/bsp/TestBSPTaskFaults.java (original)
+++ incubator/hama/trunk/core/src/test/java/org/apache/hama/bsp/TestBSPTaskFaults.java Sat May 19 08:32:28 2012
@@ -54,6 +54,7 @@ import org.apache.hama.bsp.sync.SyncClie
 import org.apache.hama.bsp.sync.SyncException;
 import org.apache.hama.bsp.sync.SyncServiceFactory;
 import org.apache.hama.ipc.BSPPeerProtocol;
+import org.apache.hama.ipc.HamaRPCProtocolVersion;
 import org.apache.hama.util.BSPNetUtils;
 
 public class TestBSPTaskFaults extends TestCase {
@@ -97,7 +98,7 @@ public class TestBSPTaskFaults extends T
     @Override
     public long getProtocolVersion(String protocol, long clientVersion)
         throws IOException {
-      return BSPPeerProtocol.versionID;
+      return HamaRPCProtocolVersion.versionID;
     }
 
     @Override
@@ -176,6 +177,7 @@ public class TestBSPTaskFaults extends T
       job = jobConf;
     }
 
+    @Override
     @SuppressWarnings("rawtypes")
     public void run() {
       BSPTask task = new BSPTask();
@@ -213,7 +215,7 @@ public class TestBSPTaskFaults extends T
       testPort = port;
     }
 
-    private void readStream(InputStream input) throws IOException {
+    private static void readStream(InputStream input) throws IOException {
       BufferedReader reader = new BufferedReader(new InputStreamReader(input));
       String line;
       while ((line = reader.readLine()) != null) {
@@ -278,6 +280,7 @@ public class TestBSPTaskFaults extends T
         // We have errorLog and infoLog to prevent block on pipe between
         // child and parent process.
         errorLog = new Thread() {
+          @Override
           public void run() {
             try {
               readStream(bspTaskProcess.getErrorStream());
@@ -289,6 +292,7 @@ public class TestBSPTaskFaults extends T
         errorLog.start();
 
         infoLog = new Thread() {
+          @Override
           public void run() {
             try {
               readStream(bspTaskProcess.getInputStream());
@@ -329,7 +333,7 @@ public class TestBSPTaskFaults extends T
         job.setOutputFormat(NullOutputFormat.class);
         
         final BSPPeerProtocol proto = (BSPPeerProtocol) RPC.getProxy(
-            BSPPeerProtocol.class, BSPPeerProtocol.versionID,
+            BSPPeerProtocol.class, HamaRPCProtocolVersion.versionID,
             new InetSocketAddress("127.0.0.1", port), hamaConf);
 
         BSPTask task = new BSPTask();
@@ -339,6 +343,7 @@ public class TestBSPTaskFaults extends T
             + hamaConf.getInt(TEST_POINT, 0));
 
         Runtime.getRuntime().addShutdownHook(new Thread() {
+          @Override
           public void run() {
             try {
               proto.close();
@@ -430,7 +435,7 @@ public class TestBSPTaskFaults extends T
     conf.setInt("bsp.groom.rpc.port", inetAddress.getPort());
 
     umbilical = (BSPPeerProtocol) RPC.getProxy(BSPPeerProtocol.class,
-        BSPPeerProtocol.versionID, inetAddress, conf);
+        HamaRPCProtocolVersion.versionID, inetAddress, conf);
     LOG.info("Started the proxy connections");
 
     this.testBSPTaskService = Executors.newScheduledThreadPool(1);

Modified: incubator/hama/trunk/core/src/test/java/org/apache/hama/bsp/TestCheckpoint.java
URL: http://svn.apache.org/viewvc/incubator/hama/trunk/core/src/test/java/org/apache/hama/bsp/TestCheckpoint.java?rev=1340371&r1=1340370&r2=1340371&view=diff
==============================================================================
--- incubator/hama/trunk/core/src/test/java/org/apache/hama/bsp/TestCheckpoint.java (original)
+++ incubator/hama/trunk/core/src/test/java/org/apache/hama/bsp/TestCheckpoint.java Sat May 19 08:32:28 2012
@@ -36,6 +36,7 @@ import org.apache.hama.bsp.message.type.
 import org.apache.hama.bsp.sync.SyncClient;
 import org.apache.hama.bsp.sync.SyncServiceFactory;
 import org.apache.hama.ipc.BSPPeerProtocol;
+import org.apache.hama.ipc.HamaRPCProtocolVersion;
 import org.apache.hama.util.BSPNetUtils;
 
 public class TestCheckpoint extends TestCase {
@@ -101,7 +102,7 @@ public class TestCheckpoint extends Test
     conf.setInt("bsp.groom.rpc.port", inetAddress.getPort());
 
     BSPPeerProtocol umbilical = (BSPPeerProtocol) RPC.getProxy(
-        BSPPeerProtocol.class, BSPPeerProtocol.versionID, inetAddress, conf);
+        BSPPeerProtocol.class, HamaRPCProtocolVersion.versionID, inetAddress, conf);
     LOG.info("Started the proxy connections");
 
     TaskAttemptID tid = new TaskAttemptID(new TaskID(new BSPJobID(
@@ -109,9 +110,10 @@ public class TestCheckpoint extends Test
 
     try {
       BSPJob job = new BSPJob(conf);
-      job.setOutputFormat(NullOutputFormat.class);
+      job.setOutputPath(TestBSPMasterGroomServer.OUTPUT_PATH);
+      job.setOutputFormat(TextOutputFormat.class);
       final BSPPeerProtocol proto = (BSPPeerProtocol) RPC.getProxy(
-          BSPPeerProtocol.class, BSPPeerProtocol.versionID,
+          BSPPeerProtocol.class, HamaRPCProtocolVersion.versionID,
           new InetSocketAddress("127.0.0.1", port), conf);
 
       BSPTask task = new BSPTask();

Modified: incubator/hama/trunk/core/src/test/java/org/apache/hama/bsp/TestClusterStatus.java
URL: http://svn.apache.org/viewvc/incubator/hama/trunk/core/src/test/java/org/apache/hama/bsp/TestClusterStatus.java?rev=1340371&r1=1340370&r2=1340371&view=diff
==============================================================================
--- incubator/hama/trunk/core/src/test/java/org/apache/hama/bsp/TestClusterStatus.java (original)
+++ incubator/hama/trunk/core/src/test/java/org/apache/hama/bsp/TestClusterStatus.java Sat May 19 08:32:28 2012
@@ -32,6 +32,7 @@ import org.apache.hadoop.io.DataOutputBu
 public class TestClusterStatus extends TestCase {
   Random rnd = new Random();
 
+  @Override
   protected void setUp() throws Exception {
     super.setUp();
   }

Modified: incubator/hama/trunk/core/src/test/java/org/apache/hama/bsp/TestZooKeeper.java
URL: http://svn.apache.org/viewvc/incubator/hama/trunk/core/src/test/java/org/apache/hama/bsp/TestZooKeeper.java?rev=1340371&r1=1340370&r2=1340371&view=diff
==============================================================================
--- incubator/hama/trunk/core/src/test/java/org/apache/hama/bsp/TestZooKeeper.java (original)
+++ incubator/hama/trunk/core/src/test/java/org/apache/hama/bsp/TestZooKeeper.java Sat May 19 08:32:28 2012
@@ -22,32 +22,20 @@ package org.apache.hama.bsp;
 import java.io.IOException;
 import java.util.ArrayList;
 
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.FileStatus;
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.io.IntWritable;
-import org.apache.hadoop.io.SequenceFile;
-import org.apache.hadoop.io.Text;
 import org.apache.hama.Constants;
 import org.apache.hama.HamaCluster;
 import org.apache.hama.HamaConfiguration;
-import org.apache.hama.examples.ClassSerializePrinting;
 import org.apache.hama.zookeeper.QuorumPeer;
 import org.apache.zookeeper.CreateMode;
 import org.apache.zookeeper.KeeperException;
 import org.apache.zookeeper.WatchedEvent;
 import org.apache.zookeeper.Watcher;
-import org.apache.zookeeper.ZooKeeper;
 import org.apache.zookeeper.ZooDefs.Ids;
+import org.apache.zookeeper.ZooKeeper;
 import org.apache.zookeeper.data.Stat;
 
 public class TestZooKeeper extends HamaCluster {
 
-  private static Log LOG = LogFactory.getLog(TestZooKeeper.class);
-
   private HamaConfiguration configuration;
 
   public TestZooKeeper() {
@@ -63,10 +51,12 @@ public class TestZooKeeper extends HamaC
             .getCanonicalName());
   }
 
+  @Override
   public void setUp() throws Exception {
     super.setUp();
   }
 
+  @Override
   public void tearDown() throws Exception {
     super.tearDown();
   }

Modified: incubator/hama/trunk/core/src/test/java/org/apache/hama/bsp/message/TestAvroMessageManager.java
URL: http://svn.apache.org/viewvc/incubator/hama/trunk/core/src/test/java/org/apache/hama/bsp/message/TestAvroMessageManager.java?rev=1340371&r1=1340370&r2=1340371&view=diff
==============================================================================
--- incubator/hama/trunk/core/src/test/java/org/apache/hama/bsp/message/TestAvroMessageManager.java (original)
+++ incubator/hama/trunk/core/src/test/java/org/apache/hama/bsp/message/TestAvroMessageManager.java Sat May 19 08:32:28 2012
@@ -93,7 +93,7 @@ public class TestAvroMessageManager exte
 
   }
 
-  public final BSPMessageBundle<Writable> getRandomBundle() {
+  public final static BSPMessageBundle<Writable> getRandomBundle() {
     BSPMessageBundle<Writable> bundle = new BSPMessageBundle<Writable>();
 
     for (int i = 0; i < INT_MSG_COUNT; i++) {

Modified: incubator/hama/trunk/core/src/test/java/org/apache/hama/bsp/message/TestHadoopMessageManager.java
URL: http://svn.apache.org/viewvc/incubator/hama/trunk/core/src/test/java/org/apache/hama/bsp/message/TestHadoopMessageManager.java?rev=1340371&r1=1340370&r2=1340371&view=diff
==============================================================================
--- incubator/hama/trunk/core/src/test/java/org/apache/hama/bsp/message/TestHadoopMessageManager.java (original)
+++ incubator/hama/trunk/core/src/test/java/org/apache/hama/bsp/message/TestHadoopMessageManager.java Sat May 19 08:32:28 2012
@@ -55,7 +55,7 @@ public class TestHadoopMessageManager ex
     messagingInternal(conf);
   }
 
-  private void messagingInternal(Configuration conf) throws Exception {
+  private static void messagingInternal(Configuration conf) throws Exception {
     conf.set(MessageManagerFactory.MESSAGE_MANAGER_CLASS,
         "org.apache.hama.bsp.message.HadoopMessageManagerImpl");
     MessageManager<IntWritable> messageManager = MessageManagerFactory