You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hama.apache.org by tj...@apache.org on 2012/07/20 18:18:47 UTC
svn commit: r1363862 - in /hama/trunk: CHANGES.txt
core/src/main/java/org/apache/hama/bsp/LocalBSPRunner.java
Author: tjungblut
Date: Fri Jul 20 16:18:46 2012
New Revision: 1363862
URL: http://svn.apache.org/viewvc?rev=1363862&view=rev
Log:
[HAMA-608]: LocalRunner should honor the configured queues
Modified:
hama/trunk/CHANGES.txt
hama/trunk/core/src/main/java/org/apache/hama/bsp/LocalBSPRunner.java
Modified: hama/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/hama/trunk/CHANGES.txt?rev=1363862&r1=1363861&r2=1363862&view=diff
==============================================================================
--- hama/trunk/CHANGES.txt (original)
+++ hama/trunk/CHANGES.txt Fri Jul 20 16:18:46 2012
@@ -6,6 +6,8 @@ Release 0.6 (unreleased changes)
BUG FIXES
+ HAMA-608: LocalRunner should honor the configured queues (tjungblut)
+
IMPROVEMENTS
Release 0.5 - April 10, 2012
Modified: hama/trunk/core/src/main/java/org/apache/hama/bsp/LocalBSPRunner.java
URL: http://svn.apache.org/viewvc/hama/trunk/core/src/main/java/org/apache/hama/bsp/LocalBSPRunner.java?rev=1363862&r1=1363861&r2=1363862&view=diff
==============================================================================
--- hama/trunk/core/src/main/java/org/apache/hama/bsp/LocalBSPRunner.java (original)
+++ hama/trunk/core/src/main/java/org/apache/hama/bsp/LocalBSPRunner.java Fri Jul 20 16:18:46 2012
@@ -20,17 +20,13 @@ package org.apache.hama.bsp;
import java.io.DataInputStream;
import java.io.IOException;
import java.net.InetSocketAddress;
-import java.util.HashMap;
-import java.util.Iterator;
import java.util.LinkedList;
-import java.util.Map.Entry;
import java.util.concurrent.Callable;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.CyclicBarrier;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
-import java.util.concurrent.LinkedBlockingDeque;
import java.util.concurrent.ThreadPoolExecutor;
import org.apache.commons.logging.Log;
@@ -45,16 +41,14 @@ import org.apache.hama.Constants;
import org.apache.hama.HamaConfiguration;
import org.apache.hama.bsp.BSPJobClient.RawSplit;
import org.apache.hama.bsp.BSPMaster.State;
-import org.apache.hama.bsp.message.MemoryQueue;
+import org.apache.hama.bsp.message.AbstractMessageManager;
import org.apache.hama.bsp.message.MessageManager;
import org.apache.hama.bsp.message.MessageManagerFactory;
-import org.apache.hama.bsp.message.MessageQueue;
import org.apache.hama.bsp.sync.SyncClient;
import org.apache.hama.bsp.sync.SyncException;
import org.apache.hama.bsp.sync.SyncServiceFactory;
import org.apache.hama.ipc.BSPPeerProtocol;
import org.apache.hama.ipc.JobSubmissionProtocol;
-import org.apache.hama.util.BSPNetUtils;
/**
* A multithreaded local BSP runner that can be used for debugging and local
@@ -68,7 +62,7 @@ public class LocalBSPRunner implements J
private volatile ThreadPoolExecutor threadPool;
@SuppressWarnings("rawtypes")
- private static final LinkedList<Future<BSPPeerImpl>> futureList = new LinkedList<Future<BSPPeerImpl>>();
+ private static final LinkedList<Future<BSPPeerImpl>> FUTURE_LIST = new LinkedList<Future<BSPPeerImpl>>();
private String jobFile;
private String jobName;
@@ -152,7 +146,7 @@ public class LocalBSPRunner implements J
peerNames = new String[numBspTask];
for (int i = 0; i < numBspTask; i++) {
peerNames[i] = "local:" + i;
- futureList.add(threadPool.submit(new BSPRunner(new Configuration(conf),
+ FUTURE_LIST.add(threadPool.submit(new BSPRunner(new Configuration(conf),
job, i, splits)));
globalCounters.incrCounter(JobInProgress.JobCounter.LAUNCHED_TASKS, 1L);
}
@@ -302,7 +296,7 @@ public class LocalBSPRunner implements J
@Override
public void run() {
boolean success = true;
- for (Future<BSPPeerImpl> future : futureList) {
+ for (Future<BSPPeerImpl> future : FUTURE_LIST) {
try {
BSPPeerImpl bspPeerImpl = future.get();
currentJobStatus.getCounter().incrAllCounters(
@@ -327,53 +321,17 @@ public class LocalBSPRunner implements J
}
- public static class LocalMessageManager<M extends Writable> implements
- MessageManager<M> {
+ public static class LocalMessageManager<M extends Writable> extends
+ AbstractMessageManager<M> {
@SuppressWarnings("rawtypes")
- private static final ConcurrentHashMap<InetSocketAddress, LocalMessageManager> managerMap = new ConcurrentHashMap<InetSocketAddress, LocalBSPRunner.LocalMessageManager>();
-
- private final HashMap<InetSocketAddress, MessageQueue<M>> localOutgoingMessages = new HashMap<InetSocketAddress, MessageQueue<M>>();
- private static final ConcurrentHashMap<String, InetSocketAddress> socketCache = new ConcurrentHashMap<String, InetSocketAddress>();
- private final LinkedBlockingDeque<M> localIncomingMessages = new LinkedBlockingDeque<M>();
-
- private BSPPeer<?, ?, ?, ?, M> peer;
+ private static final ConcurrentHashMap<InetSocketAddress, LocalMessageManager> MANAGER_MAP = new ConcurrentHashMap<InetSocketAddress, LocalBSPRunner.LocalMessageManager>();
@Override
public void init(TaskAttemptID attemptId, BSPPeer<?, ?, ?, ?, M> peer,
Configuration conf, InetSocketAddress peerAddress) {
- this.peer = peer;
- managerMap.put(peerAddress, this);
- }
-
- @Override
- public void close() {
-
- }
-
- @Override
- public M getCurrentMessage() throws IOException {
- if (localIncomingMessages.isEmpty()) {
- return null;
- } else {
- return localIncomingMessages.pop();
- }
- }
-
- @Override
- public void send(String peerName, M msg) throws IOException {
- InetSocketAddress inetSocketAddress = socketCache.get(peerName);
- if (inetSocketAddress == null) {
- inetSocketAddress = BSPNetUtils.getAddress(peerName);
- socketCache.put(peerName, inetSocketAddress);
- }
- MessageQueue<M> msgs = localOutgoingMessages.get(inetSocketAddress);
- if (msgs == null) {
- msgs = new MemoryQueue<M>();
- }
- msgs.add(msg);
- peer.incrementCounter(BSPPeerImpl.PeerCounter.TOTAL_MESSAGES_SENT, 1L);
- localOutgoingMessages.put(inetSocketAddress, msgs);
+ super.init(attemptId, peer, conf, peerAddress);
+ MANAGER_MAP.put(peerAddress, this);
}
@SuppressWarnings("unchecked")
@@ -381,33 +339,11 @@ public class LocalBSPRunner implements J
public void transfer(InetSocketAddress addr, BSPMessageBundle<M> bundle)
throws IOException {
for (M value : bundle.getMessages()) {
- managerMap.get(addr).localIncomingMessages.add(value);
+ MANAGER_MAP.get(addr).localQueueForNextIteration.add(value);
peer.incrementCounter(BSPPeerImpl.PeerCounter.TOTAL_MESSAGES_RECEIVED,
1L);
}
}
-
- @Override
- public Iterator<Entry<InetSocketAddress, MessageQueue<M>>> getMessageIterator() {
- return localOutgoingMessages.entrySet().iterator();
- }
-
- @Override
- public void clearOutgoingQueues() {
- localOutgoingMessages.clear();
- }
-
- @Override
- public int getNumCurrentMessages() {
- return localIncomingMessages.size();
- }
-
- @Override
- public void finishSendPhase() throws IOException {
- // TODO Auto-generated method stub
-
- }
-
}
public static class LocalUmbilical implements BSPPeerProtocol {