You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hama.apache.org by tj...@apache.org on 2012/05/19 10:32:29 UTC
svn commit: r1340371 - in /incubator/hama/trunk: ./
core/src/main/java/org/apache/hama/bsp/message/
core/src/main/java/org/apache/hama/bsp/message/compress/
core/src/test/java/org/apache/hama/ core/src/test/java/org/apache/hama/bsp/
core/src/test/java/...
Author: tjungblut
Date: Sat May 19 08:32:28 2012
New Revision: 1340371
URL: http://svn.apache.org/viewvc?rev=1340371&view=rev
Log:
[HAMA-566]: Add a disk-based queue
Added:
incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/message/SingleLockQueue.java
Modified:
incubator/hama/trunk/CHANGES.txt
incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/message/AbstractMessageManager.java
incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/message/AvroBSPMessageBundle.java
incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/message/AvroMessageManagerImpl.java
incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/message/DiskQueue.java
incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/message/HadoopMessageManagerImpl.java
incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/message/SynchronizedQueue.java
incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/message/compress/Bzip2Compressor.java
incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/message/compress/SnappyCompressor.java
incubator/hama/trunk/core/src/test/java/org/apache/hama/MiniZooKeeperCluster.java
incubator/hama/trunk/core/src/test/java/org/apache/hama/bsp/TestBSPMasterGroomServer.java
incubator/hama/trunk/core/src/test/java/org/apache/hama/bsp/TestBSPTaskFaults.java
incubator/hama/trunk/core/src/test/java/org/apache/hama/bsp/TestCheckpoint.java
incubator/hama/trunk/core/src/test/java/org/apache/hama/bsp/TestClusterStatus.java
incubator/hama/trunk/core/src/test/java/org/apache/hama/bsp/TestZooKeeper.java
incubator/hama/trunk/core/src/test/java/org/apache/hama/bsp/message/TestAvroMessageManager.java
incubator/hama/trunk/core/src/test/java/org/apache/hama/bsp/message/TestHadoopMessageManager.java
Modified: incubator/hama/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/incubator/hama/trunk/CHANGES.txt?rev=1340371&r1=1340370&r2=1340371&view=diff
==============================================================================
--- incubator/hama/trunk/CHANGES.txt (original)
+++ incubator/hama/trunk/CHANGES.txt Sat May 19 08:32:28 2012
@@ -4,6 +4,7 @@ Release 0.5 - April 10, 2012
NEW FEATURES
+ HAMA-566: Add disk-based queue (tjungblut)
HAMA-552: Add a sorted message queue (tjungblut)
HAMA-556: Graph package to support stopping the interations when the node changes are within the tolerance value as in the case of page rank (tjungblut)
HAMA-508: Add clean plugin (Mikalai Parafeniuk via edwardyoon)
Modified: incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/message/AbstractMessageManager.java
URL: http://svn.apache.org/viewvc/incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/message/AbstractMessageManager.java?rev=1340371&r1=1340370&r2=1340371&view=diff
==============================================================================
--- incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/message/AbstractMessageManager.java (original)
+++ incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/message/AbstractMessageManager.java Sat May 19 08:32:28 2012
@@ -28,6 +28,7 @@ import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configurable;
import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.io.Writable;
import org.apache.hadoop.util.ReflectionUtils;
import org.apache.hama.bsp.BSPPeer;
@@ -35,10 +36,15 @@ import org.apache.hama.bsp.BSPPeerImpl;
import org.apache.hama.bsp.TaskAttemptID;
import org.apache.hama.util.BSPNetUtils;
+/**
+ * Abstract baseclass that should contain all information and services needed
+ * for the concrete RPC subclasses. For example it manages how the queues are
+ * managed and it maintains a cache for socket addresses.
+ */
public abstract class AbstractMessageManager<M extends Writable> implements
MessageManager<M>, Configurable {
- private static final Log LOG = LogFactory
+ protected static final Log LOG = LogFactory
.getLog(AbstractMessageManager.class);
// conf is injected via reflection of the factory
@@ -48,12 +54,19 @@ public abstract class AbstractMessageMan
protected MessageQueue<M> localQueue;
// this must be a synchronized implementation: this is accessed per RPC
protected SynchronizedQueue<M> localQueueForNextIteration;
+ // this peer object is just used for counter incrementation
protected BSPPeer<?, ?, ?, ?, M> peer;
// the peer address of this peer
protected InetSocketAddress peerAddress;
// the task attempt id
protected TaskAttemptID attemptId;
+ /*
+ * (non-Javadoc)
+ * @see org.apache.hama.bsp.message.MessageManager#init(org.apache.hama.bsp.
+ * TaskAttemptID, org.apache.hama.bsp.BSPPeer,
+ * org.apache.hadoop.conf.Configuration, java.net.InetSocketAddress)
+ */
@Override
public void init(TaskAttemptID attemptId, BSPPeer<?, ?, ?, ?, M> peer,
Configuration conf, InetSocketAddress peerAddress) {
@@ -62,11 +75,13 @@ public abstract class AbstractMessageMan
this.conf = conf;
this.peerAddress = peerAddress;
localQueue = getQueue();
- localQueue.init(conf, attemptId);
localQueueForNextIteration = getSynchronizedQueue();
- localQueueForNextIteration.init(conf, attemptId);
}
+ /*
+ * (non-Javadoc)
+ * @see org.apache.hama.bsp.message.MessageManager#close()
+ */
@Override
public void close() {
Collection<MessageQueue<M>> values = outgoingQueues.values();
@@ -74,8 +89,21 @@ public abstract class AbstractMessageMan
msgQueue.close();
}
localQueue.close();
+ // remove possible disk queues from the path
+ try {
+ FileSystem.get(conf).delete(
+ DiskQueue.getQueueDir(conf, attemptId,
+ conf.get(DiskQueue.DISK_QUEUE_PATH_KEY)), true);
+ } catch (IOException e) {
+ LOG.warn("Queue dir couldn't be deleted");
+ }
+
}
+ /*
+ * (non-Javadoc)
+ * @see org.apache.hama.bsp.message.MessageManager#finishSendPhase()
+ */
@Override
public void finishSendPhase() throws IOException {
Collection<MessageQueue<M>> values = outgoingQueues.values();
@@ -84,29 +112,42 @@ public abstract class AbstractMessageMan
}
}
+ /*
+ * (non-Javadoc)
+ * @see org.apache.hama.bsp.message.MessageManager#getCurrentMessage()
+ */
@Override
public final M getCurrentMessage() throws IOException {
return localQueue.poll();
}
+ /*
+ * (non-Javadoc)
+ * @see org.apache.hama.bsp.message.MessageManager#getNumCurrentMessages()
+ */
@Override
public final int getNumCurrentMessages() {
return localQueue.size();
}
+ /*
+ * (non-Javadoc)
+ * @see org.apache.hama.bsp.message.MessageManager#clearOutgoingQueues()
+ */
@Override
public final void clearOutgoingQueues() {
- this.outgoingQueues.clear();
- localQueueForNextIteration.prepareRead();
- localQueue.prepareWrite();
- localQueue.addAll(localQueueForNextIteration.getMessageQueue());
+ localQueue = localQueueForNextIteration.getMessageQueue();
localQueue.prepareRead();
- localQueueForNextIteration.clear();
+ localQueueForNextIteration = getSynchronizedQueue();
}
+ /*
+ * (non-Javadoc)
+ * @see org.apache.hama.bsp.message.MessageManager#send(java.lang.String,
+ * org.apache.hadoop.io.Writable)
+ */
@Override
public void send(String peerName, M msg) throws IOException {
- LOG.debug("Send message (" + msg.toString() + ") to " + peerName);
InetSocketAddress targetPeerAddress = null;
// Get socket for target peer.
if (peerSocketCache.containsKey(peerName)) {
@@ -124,6 +165,10 @@ public abstract class AbstractMessageMan
outgoingQueues.put(targetPeerAddress, queue);
}
+ /*
+ * (non-Javadoc)
+ * @see org.apache.hama.bsp.message.MessageManager#getMessageIterator()
+ */
@Override
public final Iterator<Entry<InetSocketAddress, MessageQueue<M>>> getMessageIterator() {
return this.outgoingQueues.entrySet().iterator();
@@ -132,12 +177,14 @@ public abstract class AbstractMessageMan
/**
* Returns a new queue implementation based on what was configured. If nothing
* has been configured for "hama.messenger.queue.class" then the
- * {@link MemoryQueue} is used.
+ * {@link MemoryQueue} is used. If you have scalability issues, then better
+ * use {@link DiskQueue}.
*
* @return a <b>new</b> queue implementation.
*/
protected MessageQueue<M> getQueue() {
Class<?> queueClass = conf.getClass(QUEUE_TYPE_CLASS, MemoryQueue.class);
+ LOG.debug("Creating new " + queueClass);
@SuppressWarnings("unchecked")
MessageQueue<M> newInstance = (MessageQueue<M>) ReflectionUtils
.newInstance(queueClass, conf);
@@ -146,13 +193,15 @@ public abstract class AbstractMessageMan
}
protected SynchronizedQueue<M> getSynchronizedQueue() {
- return SynchronizedQueue.synchronize(getQueue());
+ return SingleLockQueue.synchronize(getQueue());
}
+ @Override
public final Configuration getConf() {
return conf;
}
+ @Override
public final void setConf(Configuration conf) {
this.conf = conf;
}
Modified: incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/message/AvroBSPMessageBundle.java
URL: http://svn.apache.org/viewvc/incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/message/AvroBSPMessageBundle.java?rev=1340371&r1=1340370&r2=1340371&view=diff
==============================================================================
--- incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/message/AvroBSPMessageBundle.java (original)
+++ incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/message/AvroBSPMessageBundle.java Sat May 19 08:32:28 2012
@@ -31,11 +31,13 @@ public final class AvroBSPMessageBundle<
@Deprecated
public java.nio.ByteBuffer data;
+ @Override
public final org.apache.avro.Schema getSchema() {
return SCHEMA$;
}
// Used by DatumWriter. Applications should not call.
+ @Override
public final java.lang.Object get(int field$) {
switch (field$) {
case 0:
@@ -46,6 +48,7 @@ public final class AvroBSPMessageBundle<
}
// Used by DatumReader. Applications should not call.
+ @Override
public final void put(int field$, java.lang.Object value$) {
switch (field$) {
case 0:
@@ -116,12 +119,12 @@ public final class AvroBSPMessageBundle<
private Builder(AvroBSPMessageBundle<?> other) {
super(AvroBSPMessageBundle.SCHEMA$);
if (isValidValue(fields[0], other.data)) {
- data = (java.nio.ByteBuffer) clone(other.data);
+ data = clone(other.data);
fieldSetFlags[0] = true;
}
}
- public final ByteBuffer clone(ByteBuffer original) {
+ public final static ByteBuffer clone(ByteBuffer original) {
ByteBuffer clone = ByteBuffer.allocate(original.capacity());
original.rewind();
clone.put(original);
Modified: incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/message/AvroMessageManagerImpl.java
URL: http://svn.apache.org/viewvc/incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/message/AvroMessageManagerImpl.java?rev=1340371&r1=1340370&r2=1340371&view=diff
==============================================================================
--- incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/message/AvroMessageManagerImpl.java (original)
+++ incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/message/AvroMessageManagerImpl.java Sat May 19 08:32:28 2012
@@ -81,7 +81,7 @@ public final class AvroMessageManagerImp
if (sender == null) {
NettyTransceiver client = new NettyTransceiver(addr);
- sender = (Sender<M>) SpecificRequestor.getClient(Sender.class, client);
+ sender = SpecificRequestor.getClient(Sender.class, client);
peers.put(addr, sender);
}
Modified: incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/message/DiskQueue.java
URL: http://svn.apache.org/viewvc/incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/message/DiskQueue.java?rev=1340371&r1=1340370&r2=1340371&view=diff
==============================================================================
--- incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/message/DiskQueue.java (original)
+++ incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/message/DiskQueue.java Sat May 19 08:32:28 2012
@@ -24,41 +24,45 @@ import java.util.Iterator;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FSDataInputStream;
+import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.ObjectWritable;
-import org.apache.hadoop.io.SequenceFile;
import org.apache.hadoop.io.Writable;
import org.apache.hama.bsp.TaskAttemptID;
/**
- * A disk based queue that is backed by a sequencefile. <br/>
+ * A disk based queue that is backed by a raw file on local disk. <br/>
* Structure is as follows: <br/>
* If "bsp.disk.queue.dir" is not defined, "hama.tmp.dir" will be used instead. <br/>
* ${hama.tmp.dir}/diskqueue/job_id/task_attempt_id/ <br/>
* An ongoing sequencenumber will be appended to prevent inner collisions,
* however the job_id dir will never be deleted. So you need a cronjob to do the
* cleanup for you. <br/>
- * <b>This is currently not intended to be production ready</b>
+ * It is recommended to use the file:// scheme in front of the property, because
+ * writes on DFS are expensive, however your local disk may not have enough
+ * space for your message, so you can easily switch per job via your
+ * configuration. <br/>
+ * <b>It is experimental to use.</b>
*/
public final class DiskQueue<M extends Writable> implements MessageQueue<M> {
public static final String DISK_QUEUE_PATH_KEY = "bsp.disk.queue.dir";
- private final Log LOG = LogFactory.getLog(DiskQueue.class);
+ private static final int MAX_RETRIES = 4;
+ private static final Log LOG = LogFactory.getLog(DiskQueue.class);
private static volatile int ONGOING_SEQUENCE_NUMBER = 0;
- private static final int MAX_RETRIES = 4;
- private static final NullWritable NULL_WRITABLE = NullWritable.get();
private int size = 0;
// injected via reflection
private Configuration conf;
private FileSystem fs;
- private SequenceFile.Writer writer;
- private SequenceFile.Reader reader;
+ private FSDataOutputStream writer;
+ private FSDataInputStream reader;
+
private Path queuePath;
private TaskAttemptID id;
private final ObjectWritable writable = new ObjectWritable();
@@ -71,17 +75,7 @@ public final class DiskQueue<M extends W
fs = FileSystem.get(conf);
String configuredQueueDir = conf.get(DISK_QUEUE_PATH_KEY);
Path queueDir = null;
- if (configuredQueueDir == null) {
- String hamaTmpDir = conf.get("hama.tmp.dir");
- if (hamaTmpDir != null) {
- queueDir = createDiskQueuePath(id, hamaTmpDir);
- } else {
- // use some local tmp dir
- queueDir = createDiskQueuePath(id, "/tmp/messageStorage/");
- }
- } else {
- queueDir = createDiskQueuePath(id, configuredQueueDir);
- }
+ queueDir = getQueueDir(conf, id, configuredQueueDir);
fs.mkdirs(queueDir);
queuePath = new Path(queueDir, (ONGOING_SEQUENCE_NUMBER++)
+ "_messages.seq");
@@ -109,6 +103,7 @@ public final class DiskQueue<M extends W
private void closeInternal(boolean delete) {
try {
if (writer != null) {
+ writer.flush();
writer.close();
writer = null;
}
@@ -124,6 +119,7 @@ public final class DiskQueue<M extends W
}
if (writer != null) {
try {
+ writer.flush();
writer.close();
writer = null;
} catch (IOException e) {
@@ -155,7 +151,7 @@ public final class DiskQueue<M extends W
// make sure we've closed
closeInternal(false);
try {
- reader = new SequenceFile.Reader(fs, queuePath, conf);
+ reader = fs.open(queuePath);
} catch (IOException e) {
// can't recover from that
LOG.error(e);
@@ -166,8 +162,7 @@ public final class DiskQueue<M extends W
@Override
public void prepareWrite() {
try {
- writer = new SequenceFile.Writer(fs, conf, queuePath,
- ObjectWritable.class, NullWritable.class);
+ writer = fs.create(queuePath);
} catch (IOException e) {
// can't recover from that
LOG.error(e);
@@ -194,7 +189,7 @@ public final class DiskQueue<M extends W
public final void add(M item) {
size++;
try {
- writer.append(new ObjectWritable(item), NULL_WRITABLE);
+ new ObjectWritable(item).write(writer);
} catch (IOException e) {
LOG.error(e);
}
@@ -210,16 +205,19 @@ public final class DiskQueue<M extends W
@SuppressWarnings("unchecked")
@Override
public final M poll() {
+ if (size == 0) {
+ return null;
+ }
size--;
int tries = 1;
while (tries <= MAX_RETRIES) {
try {
- boolean next = reader.next(writable, NULL_WRITABLE);
- if (next) {
+ writable.readFields(reader);
+ if (size > 0) {
return (M) writable.get();
} else {
- closeInternal(false);
- return null;
+ closeInternal(true);
+ return (M) writable.get();
}
} catch (IOException e) {
LOG.error("Retrying for the " + tries + "th time!", e);
@@ -277,11 +275,32 @@ public final class DiskQueue<M extends W
}
/**
+ * Creates a path for a queue
+ */
+ public static Path getQueueDir(Configuration conf, TaskAttemptID id,
+ String configuredQueueDir) {
+ Path queueDir;
+ if (configuredQueueDir == null) {
+ String hamaTmpDir = conf.get("hama.tmp.dir");
+ if (hamaTmpDir != null) {
+ queueDir = createDiskQueuePath(id, hamaTmpDir);
+ } else {
+ // use some local tmp dir
+ queueDir = createDiskQueuePath(id, "/tmp/messageStorage/");
+ }
+ } else {
+ queueDir = createDiskQueuePath(id, configuredQueueDir);
+ }
+ return queueDir;
+ }
+
+ /**
* Creates a generic Path based on the configured path and the task attempt id
* to store disk sequence files. <br/>
* Structure is as follows: ${hama.tmp.dir}/diskqueue/job_id/task_attempt_id/
*/
- public static Path createDiskQueuePath(TaskAttemptID id, String configuredPath) {
+ private static Path createDiskQueuePath(TaskAttemptID id,
+ String configuredPath) {
return new Path(new Path(new Path(configuredPath, "diskqueue"), id
.getJobID().toString()), id.getTaskID().toString());
}
Modified: incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/message/HadoopMessageManagerImpl.java
URL: http://svn.apache.org/viewvc/incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/message/HadoopMessageManagerImpl.java?rev=1340371&r1=1340370&r2=1340371&view=diff
==============================================================================
--- incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/message/HadoopMessageManagerImpl.java (original)
+++ incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/message/HadoopMessageManagerImpl.java Sat May 19 08:32:28 2012
@@ -32,6 +32,7 @@ import org.apache.hama.bsp.BSPPeer;
import org.apache.hama.bsp.BSPPeerImpl;
import org.apache.hama.bsp.TaskAttemptID;
import org.apache.hama.bsp.message.compress.BSPCompressedBundle;
+import org.apache.hama.ipc.HamaRPCProtocolVersion;
import org.apache.hama.util.CompressionUtil;
/**
@@ -107,7 +108,7 @@ public final class HadoopMessageManagerI
HadoopMessageManager<M> peer = peers.get(addr);
if (peer == null) {
peer = (HadoopMessageManager<M>) RPC.getProxy(HadoopMessageManager.class,
- HadoopMessageManager.versionID, addr, this.conf);
+ HamaRPCProtocolVersion.versionID, addr, this.conf);
this.peers.put(addr, peer);
}
return peer;
Added: incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/message/SingleLockQueue.java
URL: http://svn.apache.org/viewvc/incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/message/SingleLockQueue.java?rev=1340371&view=auto
==============================================================================
--- incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/message/SingleLockQueue.java (added)
+++ incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/message/SingleLockQueue.java Sat May 19 08:32:28 2012
@@ -0,0 +1,193 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hama.bsp.message;
+
+import java.util.Collection;
+import java.util.Iterator;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hama.bsp.TaskAttemptID;
+
+/**
+ * A global mutex based synchronized queue.
+ */
+public final class SingleLockQueue<T> implements SynchronizedQueue<T> {
+
+ private final MessageQueue<T> queue;
+ private final Object mutex;
+
+ private SingleLockQueue(MessageQueue<T> queue) {
+ this.queue = queue;
+ this.mutex = new Object();
+ }
+
+ private SingleLockQueue(MessageQueue<T> queue, Object mutex) {
+ this.queue = queue;
+ this.mutex = mutex;
+ }
+
+ /*
+ * (non-Javadoc)
+ * @see org.apache.hama.bsp.message.SynchronizedQueue#iterator()
+ */
+ @Override
+ public Iterator<T> iterator() {
+ synchronized (mutex) {
+ return queue.iterator();
+ }
+ }
+
+ /*
+ * (non-Javadoc)
+ * @see
+ * org.apache.hama.bsp.message.SynchronizedQueue#setConf(org.apache.hadoop
+ * .conf.Configuration)
+ */
+ @Override
+ public void setConf(Configuration conf) {
+ synchronized (mutex) {
+ queue.setConf(conf);
+ }
+ }
+
+ /*
+ * (non-Javadoc)
+ * @see org.apache.hama.bsp.message.SynchronizedQueue#getConf()
+ */
+ @Override
+ public Configuration getConf() {
+ synchronized (mutex) {
+ return queue.getConf();
+ }
+ }
+
+ /*
+ * (non-Javadoc)
+ * @see
+ * org.apache.hama.bsp.message.SynchronizedQueue#init(org.apache.hadoop.conf
+ * .Configuration, org.apache.hama.bsp.TaskAttemptID)
+ */
+ @Override
+ public void init(Configuration conf, TaskAttemptID id) {
+ synchronized (mutex) {
+ queue.init(conf, id);
+ }
+ }
+
+ /*
+ * (non-Javadoc)
+ * @see org.apache.hama.bsp.message.SynchronizedQueue#close()
+ */
+ @Override
+ public void close() {
+ synchronized (mutex) {
+ }
+ queue.close();
+ }
+
+ /*
+ * (non-Javadoc)
+ * @see org.apache.hama.bsp.message.SynchronizedQueue#prepareRead()
+ */
+ @Override
+ public void prepareRead() {
+ synchronized (mutex) {
+ queue.prepareRead();
+ }
+ }
+
+ /*
+ * (non-Javadoc)
+ * @see
+ * org.apache.hama.bsp.message.SynchronizedQueue#addAll(java.util.Collection)
+ */
+ @Override
+ public void addAll(Collection<T> col) {
+ synchronized (mutex) {
+ queue.addAll(col);
+ }
+ }
+
+ /*
+ * (non-Javadoc)
+ * @see org.apache.hama.bsp.message.SynchronizedQueue#add(T)
+ */
+ @Override
+ public void add(T item) {
+ synchronized (mutex) {
+ queue.add(item);
+ }
+ }
+
+ /*
+ * (non-Javadoc)
+ * @see org.apache.hama.bsp.message.SynchronizedQueue#clear()
+ */
+ @Override
+ public void clear() {
+ synchronized (mutex) {
+ queue.clear();
+ }
+ }
+
+ /*
+ * (non-Javadoc)
+ * @see org.apache.hama.bsp.message.SynchronizedQueue#poll()
+ */
+ @Override
+ public Object poll() {
+ synchronized (mutex) {
+ return queue.poll();
+ }
+ }
+
+ /*
+ * (non-Javadoc)
+ * @see org.apache.hama.bsp.message.SynchronizedQueue#size()
+ */
+ @Override
+ public int size() {
+ synchronized (mutex) {
+ return queue.size();
+ }
+ }
+
+ /*
+ * (non-Javadoc)
+ * @see org.apache.hama.bsp.message.SynchronizedQueue#getMessageQueue()
+ */
+ @Override
+ public MessageQueue<T> getMessageQueue() {
+ synchronized (mutex) {
+ return queue;
+ }
+ }
+
+ /*
+ * static constructor methods to be type safe
+ */
+
+ public static <T> SynchronizedQueue<T> synchronize(MessageQueue<T> queue) {
+ return new SingleLockQueue<T>(queue);
+ }
+
+ public static <T> SynchronizedQueue<T> synchronize(MessageQueue<T> queue,
+ Object mutex) {
+ return new SingleLockQueue<T>(queue, mutex);
+ }
+}
Modified: incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/message/SynchronizedQueue.java
URL: http://svn.apache.org/viewvc/incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/message/SynchronizedQueue.java?rev=1340371&r1=1340370&r2=1340371&view=diff
==============================================================================
--- incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/message/SynchronizedQueue.java (original)
+++ incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/message/SynchronizedQueue.java Sat May 19 08:32:28 2012
@@ -20,110 +20,34 @@ package org.apache.hama.bsp.message;
import java.util.Collection;
import java.util.Iterator;
+import org.apache.hadoop.conf.Configurable;
import org.apache.hadoop.conf.Configuration;
import org.apache.hama.bsp.TaskAttemptID;
/**
- * A global mutex based synchronized queue.
+ * Synchronized Queue interface. Can be used to implement better synchronized
+ * datastructures.
*/
-public final class SynchronizedQueue<T> {
+public interface SynchronizedQueue<T> extends Configurable {
- private final MessageQueue<T> queue;
- private final Object mutex;
+ public abstract Iterator<T> iterator();
- private SynchronizedQueue(MessageQueue<T> queue) {
- this.queue = queue;
- this.mutex = new Object();
- }
-
- private SynchronizedQueue(MessageQueue<T> queue, Object mutex) {
- this.queue = queue;
- this.mutex = mutex;
- }
-
- public Iterator<T> iterator() {
- synchronized (mutex) {
- return queue.iterator();
- }
- }
-
- public void setConf(Configuration conf) {
- synchronized (mutex) {
- queue.setConf(conf);
- }
- }
-
- public Configuration getConf() {
- synchronized (mutex) {
- return queue.getConf();
- }
- }
-
- public void init(Configuration conf, TaskAttemptID id) {
- synchronized (mutex) {
- queue.init(conf, id);
- }
- }
-
- public void close() {
- synchronized (mutex) {
- }
- queue.close();
- }
-
- public void prepareRead() {
- synchronized (mutex) {
- queue.prepareRead();
- }
- }
-
- public void addAll(Collection<T> col) {
- synchronized (mutex) {
- queue.addAll(col);
- }
- }
-
- public void add(T item) {
- synchronized (mutex) {
- queue.add(item);
- }
- }
-
- public void clear() {
- synchronized (mutex) {
- queue.clear();
- }
- }
-
- public Object poll() {
- synchronized (mutex) {
- return queue.poll();
- }
- }
-
- public int size() {
- synchronized (mutex) {
- return queue.size();
- }
- }
-
- public MessageQueue<T> getMessageQueue() {
- synchronized (mutex) {
- return queue;
- }
- }
-
- /*
- * static constructor methods to be type safe
- */
-
- public static <T> SynchronizedQueue<T> synchronize(MessageQueue<T> queue) {
- return new SynchronizedQueue<T>(queue);
- }
-
- public static <T> SynchronizedQueue<T> synchronize(MessageQueue<T> queue,
- Object mutex) {
- return new SynchronizedQueue<T>(queue, mutex);
- }
+ public abstract void init(Configuration conf, TaskAttemptID id);
+
+ public abstract void close();
+
+ public abstract void prepareRead();
+
+ public abstract void addAll(Collection<T> col);
+
+ public abstract void add(T item);
+
+ public abstract void clear();
+
+ public abstract Object poll();
+
+ public abstract int size();
+
+ public abstract MessageQueue<T> getMessageQueue();
}
Modified: incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/message/compress/Bzip2Compressor.java
URL: http://svn.apache.org/viewvc/incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/message/compress/Bzip2Compressor.java?rev=1340371&r1=1340370&r2=1340371&view=diff
==============================================================================
--- incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/message/compress/Bzip2Compressor.java (original)
+++ incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/message/compress/Bzip2Compressor.java Sat May 19 08:32:28 2012
@@ -34,6 +34,7 @@ public class Bzip2Compressor<M extends W
private final BZip2Codec codec = new BZip2Codec();
+ @Override
public BSPCompressedBundle compressBundle(BSPMessageBundle<M> bundle) {
BSPCompressedBundle compMsgBundle = null;
ByteArrayOutputStream bos = null;
@@ -71,6 +72,7 @@ public class Bzip2Compressor<M extends W
* @param compMsgBundle
* @return
*/
+ @Override
public BSPMessageBundle<M> decompressBundle(BSPCompressedBundle compMsgBundle) {
ByteArrayInputStream bis = null;
CompressionInputStream sis = null;
Modified: incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/message/compress/SnappyCompressor.java
URL: http://svn.apache.org/viewvc/incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/message/compress/SnappyCompressor.java?rev=1340371&r1=1340370&r2=1340371&view=diff
==============================================================================
--- incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/message/compress/SnappyCompressor.java (original)
+++ incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/message/compress/SnappyCompressor.java Sat May 19 08:32:28 2012
@@ -31,6 +31,7 @@ import org.xerial.snappy.SnappyOutputStr
public class SnappyCompressor<M extends Writable> implements
BSPMessageCompressor<M> {
+ @Override
public BSPCompressedBundle compressBundle(BSPMessageBundle<M> bundle) {
BSPCompressedBundle compMsgBundle = null;
ByteArrayOutputStream bos = null;
@@ -68,6 +69,7 @@ public class SnappyCompressor<M extends
* @param compMsgBundle
* @return
*/
+ @Override
public BSPMessageBundle<M> decompressBundle(BSPCompressedBundle compMsgBundle) {
ByteArrayInputStream bis = null;
SnappyInputStream sis = null;
Modified: incubator/hama/trunk/core/src/test/java/org/apache/hama/MiniZooKeeperCluster.java
URL: http://svn.apache.org/viewvc/incubator/hama/trunk/core/src/test/java/org/apache/hama/MiniZooKeeperCluster.java?rev=1340371&r1=1340370&r2=1340371&view=diff
==============================================================================
--- incubator/hama/trunk/core/src/test/java/org/apache/hama/MiniZooKeeperCluster.java (original)
+++ incubator/hama/trunk/core/src/test/java/org/apache/hama/MiniZooKeeperCluster.java Sat May 19 08:32:28 2012
@@ -117,7 +117,7 @@ public class MiniZooKeeperCluster {
return clientPort;
}
- private void recreateDir(File dir) throws IOException {
+ private static void recreateDir(File dir) throws IOException {
if (dir.exists()) {
FileUtil.fullyDelete(dir);
}
Modified: incubator/hama/trunk/core/src/test/java/org/apache/hama/bsp/TestBSPMasterGroomServer.java
URL: http://svn.apache.org/viewvc/incubator/hama/trunk/core/src/test/java/org/apache/hama/bsp/TestBSPMasterGroomServer.java?rev=1340371&r1=1340370&r2=1340371&view=diff
==============================================================================
--- incubator/hama/trunk/core/src/test/java/org/apache/hama/bsp/TestBSPMasterGroomServer.java (original)
+++ incubator/hama/trunk/core/src/test/java/org/apache/hama/bsp/TestBSPMasterGroomServer.java Sat May 19 08:32:28 2012
@@ -132,62 +132,31 @@ public class TestBSPMasterGroomServer ex
*/
/*
- * BEGIN: ZooKeeper tests.
-
- public void testClearZKNodes() throws IOException, KeeperException,
- InterruptedException {
-
- // Clear any existing znode with the same path as bspRoot.
- bspCluster.getBSPMaster().clearZKNodes();
-
- int timeout = configuration.getInt(Constants.ZOOKEEPER_SESSION_TIMEOUT,
- 6000);
- String connectStr = QuorumPeer.getZKQuorumServersString(configuration);
- String bspRoot = configuration.get(Constants.ZOOKEEPER_ROOT,
- Constants.DEFAULT_ZOOKEEPER_ROOT);
-
- // Establishing a zk session.
- ZooKeeper zk = new ZooKeeper(connectStr, timeout, new Watcher() {
- @Override
- public void process(WatchedEvent event) {
- // Do nothing.(Dummy Watcher)
- }
- });
-
- // Creating dummy bspRoot if it doesn't already exist.
- Stat s = zk.exists(bspRoot, false);
- if (s == null) {
- zk.create(bspRoot, new byte[0], Ids.OPEN_ACL_UNSAFE,
- CreateMode.PERSISTENT);
- }
-
- // Creating dummy child nodes at depth 1.
- String node1 = bspRoot + "/task1";
- String node2 = bspRoot + "/task2";
- zk.create(node1, new byte[0], Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
- zk.create(node2, new byte[0], Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
-
- // Creating dummy child node at depth 2.
- String node11 = node1 + "/superstep1";
- zk.create(node11, new byte[0], Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
-
- ArrayList<String> list = (ArrayList<String>) zk.getChildren(bspRoot, false);
- assertEquals(2, list.size());
- System.out.println(list.size());
-
- bspCluster.getBSPMaster().clearZKNodes();
-
- list = (ArrayList<String>) zk.getChildren(bspRoot, false);
- System.out.println(list.size());
- assertEquals(0, list.size());
-
- try {
- zk.getData(node11, false, null);
- fail();
- } catch (KeeperException.NoNodeException e) {
- System.out.println("Node has been removed correctly!");
- }
- }
+ * BEGIN: ZooKeeper tests. public void testClearZKNodes() throws IOException,
+ * KeeperException, InterruptedException { // Clear any existing znode with
+ * the same path as bspRoot. bspCluster.getBSPMaster().clearZKNodes(); int
+ * timeout = configuration.getInt(Constants.ZOOKEEPER_SESSION_TIMEOUT, 6000);
+ * String connectStr = QuorumPeer.getZKQuorumServersString(configuration);
+ * String bspRoot = configuration.get(Constants.ZOOKEEPER_ROOT,
+ * Constants.DEFAULT_ZOOKEEPER_ROOT); // Establishing a zk session. ZooKeeper
+ * zk = new ZooKeeper(connectStr, timeout, new Watcher() {
+ * @Override public void process(WatchedEvent event) { // Do nothing.(Dummy
+ * Watcher) } }); // Creating dummy bspRoot if it doesn't already exist. Stat
+ * s = zk.exists(bspRoot, false); if (s == null) { zk.create(bspRoot, new
+ * byte[0], Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT); } // Creating dummy
+ * child nodes at depth 1. String node1 = bspRoot + "/task1"; String node2 =
+ * bspRoot + "/task2"; zk.create(node1, new byte[0], Ids.OPEN_ACL_UNSAFE,
+ * CreateMode.PERSISTENT); zk.create(node2, new byte[0], Ids.OPEN_ACL_UNSAFE,
+ * CreateMode.PERSISTENT); // Creating dummy child node at depth 2. String
+ * node11 = node1 + "/superstep1"; zk.create(node11, new byte[0],
+ * Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT); ArrayList<String> list =
+ * (ArrayList<String>) zk.getChildren(bspRoot, false); assertEquals(2,
+ * list.size()); System.out.println(list.size());
+ * bspCluster.getBSPMaster().clearZKNodes(); list = (ArrayList<String>)
+ * zk.getChildren(bspRoot, false); System.out.println(list.size());
+ * assertEquals(0, list.size()); try { zk.getData(node11, false, null);
+ * fail(); } catch (KeeperException.NoNodeException e) {
+ * System.out.println("Node has been removed correctly!"); } }
*/
/*
Modified: incubator/hama/trunk/core/src/test/java/org/apache/hama/bsp/TestBSPTaskFaults.java
URL: http://svn.apache.org/viewvc/incubator/hama/trunk/core/src/test/java/org/apache/hama/bsp/TestBSPTaskFaults.java?rev=1340371&r1=1340370&r2=1340371&view=diff
==============================================================================
--- incubator/hama/trunk/core/src/test/java/org/apache/hama/bsp/TestBSPTaskFaults.java (original)
+++ incubator/hama/trunk/core/src/test/java/org/apache/hama/bsp/TestBSPTaskFaults.java Sat May 19 08:32:28 2012
@@ -54,6 +54,7 @@ import org.apache.hama.bsp.sync.SyncClie
import org.apache.hama.bsp.sync.SyncException;
import org.apache.hama.bsp.sync.SyncServiceFactory;
import org.apache.hama.ipc.BSPPeerProtocol;
+import org.apache.hama.ipc.HamaRPCProtocolVersion;
import org.apache.hama.util.BSPNetUtils;
public class TestBSPTaskFaults extends TestCase {
@@ -97,7 +98,7 @@ public class TestBSPTaskFaults extends T
@Override
public long getProtocolVersion(String protocol, long clientVersion)
throws IOException {
- return BSPPeerProtocol.versionID;
+ return HamaRPCProtocolVersion.versionID;
}
@Override
@@ -176,6 +177,7 @@ public class TestBSPTaskFaults extends T
job = jobConf;
}
+ @Override
@SuppressWarnings("rawtypes")
public void run() {
BSPTask task = new BSPTask();
@@ -213,7 +215,7 @@ public class TestBSPTaskFaults extends T
testPort = port;
}
- private void readStream(InputStream input) throws IOException {
+ private static void readStream(InputStream input) throws IOException {
BufferedReader reader = new BufferedReader(new InputStreamReader(input));
String line;
while ((line = reader.readLine()) != null) {
@@ -278,6 +280,7 @@ public class TestBSPTaskFaults extends T
// We have errorLog and infoLog to prevent block on pipe between
// child and parent process.
errorLog = new Thread() {
+ @Override
public void run() {
try {
readStream(bspTaskProcess.getErrorStream());
@@ -289,6 +292,7 @@ public class TestBSPTaskFaults extends T
errorLog.start();
infoLog = new Thread() {
+ @Override
public void run() {
try {
readStream(bspTaskProcess.getInputStream());
@@ -329,7 +333,7 @@ public class TestBSPTaskFaults extends T
job.setOutputFormat(NullOutputFormat.class);
final BSPPeerProtocol proto = (BSPPeerProtocol) RPC.getProxy(
- BSPPeerProtocol.class, BSPPeerProtocol.versionID,
+ BSPPeerProtocol.class, HamaRPCProtocolVersion.versionID,
new InetSocketAddress("127.0.0.1", port), hamaConf);
BSPTask task = new BSPTask();
@@ -339,6 +343,7 @@ public class TestBSPTaskFaults extends T
+ hamaConf.getInt(TEST_POINT, 0));
Runtime.getRuntime().addShutdownHook(new Thread() {
+ @Override
public void run() {
try {
proto.close();
@@ -430,7 +435,7 @@ public class TestBSPTaskFaults extends T
conf.setInt("bsp.groom.rpc.port", inetAddress.getPort());
umbilical = (BSPPeerProtocol) RPC.getProxy(BSPPeerProtocol.class,
- BSPPeerProtocol.versionID, inetAddress, conf);
+ HamaRPCProtocolVersion.versionID, inetAddress, conf);
LOG.info("Started the proxy connections");
this.testBSPTaskService = Executors.newScheduledThreadPool(1);
Modified: incubator/hama/trunk/core/src/test/java/org/apache/hama/bsp/TestCheckpoint.java
URL: http://svn.apache.org/viewvc/incubator/hama/trunk/core/src/test/java/org/apache/hama/bsp/TestCheckpoint.java?rev=1340371&r1=1340370&r2=1340371&view=diff
==============================================================================
--- incubator/hama/trunk/core/src/test/java/org/apache/hama/bsp/TestCheckpoint.java (original)
+++ incubator/hama/trunk/core/src/test/java/org/apache/hama/bsp/TestCheckpoint.java Sat May 19 08:32:28 2012
@@ -36,6 +36,7 @@ import org.apache.hama.bsp.message.type.
import org.apache.hama.bsp.sync.SyncClient;
import org.apache.hama.bsp.sync.SyncServiceFactory;
import org.apache.hama.ipc.BSPPeerProtocol;
+import org.apache.hama.ipc.HamaRPCProtocolVersion;
import org.apache.hama.util.BSPNetUtils;
public class TestCheckpoint extends TestCase {
@@ -101,7 +102,7 @@ public class TestCheckpoint extends Test
conf.setInt("bsp.groom.rpc.port", inetAddress.getPort());
BSPPeerProtocol umbilical = (BSPPeerProtocol) RPC.getProxy(
- BSPPeerProtocol.class, BSPPeerProtocol.versionID, inetAddress, conf);
+ BSPPeerProtocol.class, HamaRPCProtocolVersion.versionID, inetAddress, conf);
LOG.info("Started the proxy connections");
TaskAttemptID tid = new TaskAttemptID(new TaskID(new BSPJobID(
@@ -109,9 +110,10 @@ public class TestCheckpoint extends Test
try {
BSPJob job = new BSPJob(conf);
- job.setOutputFormat(NullOutputFormat.class);
+ job.setOutputPath(TestBSPMasterGroomServer.OUTPUT_PATH);
+ job.setOutputFormat(TextOutputFormat.class);
final BSPPeerProtocol proto = (BSPPeerProtocol) RPC.getProxy(
- BSPPeerProtocol.class, BSPPeerProtocol.versionID,
+ BSPPeerProtocol.class, HamaRPCProtocolVersion.versionID,
new InetSocketAddress("127.0.0.1", port), conf);
BSPTask task = new BSPTask();
Modified: incubator/hama/trunk/core/src/test/java/org/apache/hama/bsp/TestClusterStatus.java
URL: http://svn.apache.org/viewvc/incubator/hama/trunk/core/src/test/java/org/apache/hama/bsp/TestClusterStatus.java?rev=1340371&r1=1340370&r2=1340371&view=diff
==============================================================================
--- incubator/hama/trunk/core/src/test/java/org/apache/hama/bsp/TestClusterStatus.java (original)
+++ incubator/hama/trunk/core/src/test/java/org/apache/hama/bsp/TestClusterStatus.java Sat May 19 08:32:28 2012
@@ -32,6 +32,7 @@ import org.apache.hadoop.io.DataOutputBu
public class TestClusterStatus extends TestCase {
Random rnd = new Random();
+ @Override
protected void setUp() throws Exception {
super.setUp();
}
Modified: incubator/hama/trunk/core/src/test/java/org/apache/hama/bsp/TestZooKeeper.java
URL: http://svn.apache.org/viewvc/incubator/hama/trunk/core/src/test/java/org/apache/hama/bsp/TestZooKeeper.java?rev=1340371&r1=1340370&r2=1340371&view=diff
==============================================================================
--- incubator/hama/trunk/core/src/test/java/org/apache/hama/bsp/TestZooKeeper.java (original)
+++ incubator/hama/trunk/core/src/test/java/org/apache/hama/bsp/TestZooKeeper.java Sat May 19 08:32:28 2012
@@ -22,32 +22,20 @@ package org.apache.hama.bsp;
import java.io.IOException;
import java.util.ArrayList;
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.FileStatus;
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.io.IntWritable;
-import org.apache.hadoop.io.SequenceFile;
-import org.apache.hadoop.io.Text;
import org.apache.hama.Constants;
import org.apache.hama.HamaCluster;
import org.apache.hama.HamaConfiguration;
-import org.apache.hama.examples.ClassSerializePrinting;
import org.apache.hama.zookeeper.QuorumPeer;
import org.apache.zookeeper.CreateMode;
import org.apache.zookeeper.KeeperException;
import org.apache.zookeeper.WatchedEvent;
import org.apache.zookeeper.Watcher;
-import org.apache.zookeeper.ZooKeeper;
import org.apache.zookeeper.ZooDefs.Ids;
+import org.apache.zookeeper.ZooKeeper;
import org.apache.zookeeper.data.Stat;
public class TestZooKeeper extends HamaCluster {
- private static Log LOG = LogFactory.getLog(TestZooKeeper.class);
-
private HamaConfiguration configuration;
public TestZooKeeper() {
@@ -63,10 +51,12 @@ public class TestZooKeeper extends HamaC
.getCanonicalName());
}
+ @Override
public void setUp() throws Exception {
super.setUp();
}
+ @Override
public void tearDown() throws Exception {
super.tearDown();
}
Modified: incubator/hama/trunk/core/src/test/java/org/apache/hama/bsp/message/TestAvroMessageManager.java
URL: http://svn.apache.org/viewvc/incubator/hama/trunk/core/src/test/java/org/apache/hama/bsp/message/TestAvroMessageManager.java?rev=1340371&r1=1340370&r2=1340371&view=diff
==============================================================================
--- incubator/hama/trunk/core/src/test/java/org/apache/hama/bsp/message/TestAvroMessageManager.java (original)
+++ incubator/hama/trunk/core/src/test/java/org/apache/hama/bsp/message/TestAvroMessageManager.java Sat May 19 08:32:28 2012
@@ -93,7 +93,7 @@ public class TestAvroMessageManager exte
}
- public final BSPMessageBundle<Writable> getRandomBundle() {
+ public final static BSPMessageBundle<Writable> getRandomBundle() {
BSPMessageBundle<Writable> bundle = new BSPMessageBundle<Writable>();
for (int i = 0; i < INT_MSG_COUNT; i++) {
Modified: incubator/hama/trunk/core/src/test/java/org/apache/hama/bsp/message/TestHadoopMessageManager.java
URL: http://svn.apache.org/viewvc/incubator/hama/trunk/core/src/test/java/org/apache/hama/bsp/message/TestHadoopMessageManager.java?rev=1340371&r1=1340370&r2=1340371&view=diff
==============================================================================
--- incubator/hama/trunk/core/src/test/java/org/apache/hama/bsp/message/TestHadoopMessageManager.java (original)
+++ incubator/hama/trunk/core/src/test/java/org/apache/hama/bsp/message/TestHadoopMessageManager.java Sat May 19 08:32:28 2012
@@ -55,7 +55,7 @@ public class TestHadoopMessageManager ex
messagingInternal(conf);
}
- private void messagingInternal(Configuration conf) throws Exception {
+ private static void messagingInternal(Configuration conf) throws Exception {
conf.set(MessageManagerFactory.MESSAGE_MANAGER_CLASS,
"org.apache.hama.bsp.message.HadoopMessageManagerImpl");
MessageManager<IntWritable> messageManager = MessageManagerFactory