You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hama.apache.org by tj...@apache.org on 2012/07/20 18:18:47 UTC

svn commit: r1363862 - in /hama/trunk: CHANGES.txt core/src/main/java/org/apache/hama/bsp/LocalBSPRunner.java

Author: tjungblut
Date: Fri Jul 20 16:18:46 2012
New Revision: 1363862

URL: http://svn.apache.org/viewvc?rev=1363862&view=rev
Log:
[HAMA-608]: LocalRunner should honor the configured queues


Modified:
    hama/trunk/CHANGES.txt
    hama/trunk/core/src/main/java/org/apache/hama/bsp/LocalBSPRunner.java

Modified: hama/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/hama/trunk/CHANGES.txt?rev=1363862&r1=1363861&r2=1363862&view=diff
==============================================================================
--- hama/trunk/CHANGES.txt (original)
+++ hama/trunk/CHANGES.txt Fri Jul 20 16:18:46 2012
@@ -6,6 +6,8 @@ Release 0.6 (unreleased changes)
 
   BUG FIXES
 
+   HAMA-608: LocalRunner should honor the configured queues (tjungblut)
+
   IMPROVEMENTS
 
 Release 0.5 - April 10, 2012 

Modified: hama/trunk/core/src/main/java/org/apache/hama/bsp/LocalBSPRunner.java
URL: http://svn.apache.org/viewvc/hama/trunk/core/src/main/java/org/apache/hama/bsp/LocalBSPRunner.java?rev=1363862&r1=1363861&r2=1363862&view=diff
==============================================================================
--- hama/trunk/core/src/main/java/org/apache/hama/bsp/LocalBSPRunner.java (original)
+++ hama/trunk/core/src/main/java/org/apache/hama/bsp/LocalBSPRunner.java Fri Jul 20 16:18:46 2012
@@ -20,17 +20,13 @@ package org.apache.hama.bsp;
 import java.io.DataInputStream;
 import java.io.IOException;
 import java.net.InetSocketAddress;
-import java.util.HashMap;
-import java.util.Iterator;
 import java.util.LinkedList;
-import java.util.Map.Entry;
 import java.util.concurrent.Callable;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.CyclicBarrier;
 import java.util.concurrent.ExecutionException;
 import java.util.concurrent.Executors;
 import java.util.concurrent.Future;
-import java.util.concurrent.LinkedBlockingDeque;
 import java.util.concurrent.ThreadPoolExecutor;
 
 import org.apache.commons.logging.Log;
@@ -45,16 +41,14 @@ import org.apache.hama.Constants;
 import org.apache.hama.HamaConfiguration;
 import org.apache.hama.bsp.BSPJobClient.RawSplit;
 import org.apache.hama.bsp.BSPMaster.State;
-import org.apache.hama.bsp.message.MemoryQueue;
+import org.apache.hama.bsp.message.AbstractMessageManager;
 import org.apache.hama.bsp.message.MessageManager;
 import org.apache.hama.bsp.message.MessageManagerFactory;
-import org.apache.hama.bsp.message.MessageQueue;
 import org.apache.hama.bsp.sync.SyncClient;
 import org.apache.hama.bsp.sync.SyncException;
 import org.apache.hama.bsp.sync.SyncServiceFactory;
 import org.apache.hama.ipc.BSPPeerProtocol;
 import org.apache.hama.ipc.JobSubmissionProtocol;
-import org.apache.hama.util.BSPNetUtils;
 
 /**
  * A multithreaded local BSP runner that can be used for debugging and local
@@ -68,7 +62,7 @@ public class LocalBSPRunner implements J
   private volatile ThreadPoolExecutor threadPool;
 
   @SuppressWarnings("rawtypes")
-  private static final LinkedList<Future<BSPPeerImpl>> futureList = new LinkedList<Future<BSPPeerImpl>>();
+  private static final LinkedList<Future<BSPPeerImpl>> FUTURE_LIST = new LinkedList<Future<BSPPeerImpl>>();
 
   private String jobFile;
   private String jobName;
@@ -152,7 +146,7 @@ public class LocalBSPRunner implements J
     peerNames = new String[numBspTask];
     for (int i = 0; i < numBspTask; i++) {
       peerNames[i] = "local:" + i;
-      futureList.add(threadPool.submit(new BSPRunner(new Configuration(conf),
+      FUTURE_LIST.add(threadPool.submit(new BSPRunner(new Configuration(conf),
           job, i, splits)));
       globalCounters.incrCounter(JobInProgress.JobCounter.LAUNCHED_TASKS, 1L);
     }
@@ -302,7 +296,7 @@ public class LocalBSPRunner implements J
     @Override
     public void run() {
       boolean success = true;
-      for (Future<BSPPeerImpl> future : futureList) {
+      for (Future<BSPPeerImpl> future : FUTURE_LIST) {
         try {
           BSPPeerImpl bspPeerImpl = future.get();
           currentJobStatus.getCounter().incrAllCounters(
@@ -327,53 +321,17 @@ public class LocalBSPRunner implements J
 
   }
 
-  public static class LocalMessageManager<M extends Writable> implements
-      MessageManager<M> {
+  public static class LocalMessageManager<M extends Writable> extends
+      AbstractMessageManager<M> {
 
     @SuppressWarnings("rawtypes")
-    private static final ConcurrentHashMap<InetSocketAddress, LocalMessageManager> managerMap = new ConcurrentHashMap<InetSocketAddress, LocalBSPRunner.LocalMessageManager>();
-
-    private final HashMap<InetSocketAddress, MessageQueue<M>> localOutgoingMessages = new HashMap<InetSocketAddress, MessageQueue<M>>();
-    private static final ConcurrentHashMap<String, InetSocketAddress> socketCache = new ConcurrentHashMap<String, InetSocketAddress>();
-    private final LinkedBlockingDeque<M> localIncomingMessages = new LinkedBlockingDeque<M>();
-
-    private BSPPeer<?, ?, ?, ?, M> peer;
+    private static final ConcurrentHashMap<InetSocketAddress, LocalMessageManager> MANAGER_MAP = new ConcurrentHashMap<InetSocketAddress, LocalBSPRunner.LocalMessageManager>();
 
     @Override
     public void init(TaskAttemptID attemptId, BSPPeer<?, ?, ?, ?, M> peer,
         Configuration conf, InetSocketAddress peerAddress) {
-      this.peer = peer;
-      managerMap.put(peerAddress, this);
-    }
-
-    @Override
-    public void close() {
-
-    }
-
-    @Override
-    public M getCurrentMessage() throws IOException {
-      if (localIncomingMessages.isEmpty()) {
-        return null;
-      } else {
-        return localIncomingMessages.pop();
-      }
-    }
-
-    @Override
-    public void send(String peerName, M msg) throws IOException {
-      InetSocketAddress inetSocketAddress = socketCache.get(peerName);
-      if (inetSocketAddress == null) {
-        inetSocketAddress = BSPNetUtils.getAddress(peerName);
-        socketCache.put(peerName, inetSocketAddress);
-      }
-      MessageQueue<M> msgs = localOutgoingMessages.get(inetSocketAddress);
-      if (msgs == null) {
-        msgs = new MemoryQueue<M>();
-      }
-      msgs.add(msg);
-      peer.incrementCounter(BSPPeerImpl.PeerCounter.TOTAL_MESSAGES_SENT, 1L);
-      localOutgoingMessages.put(inetSocketAddress, msgs);
+      super.init(attemptId, peer, conf, peerAddress);
+      MANAGER_MAP.put(peerAddress, this);
     }
 
     @SuppressWarnings("unchecked")
@@ -381,33 +339,11 @@ public class LocalBSPRunner implements J
     public void transfer(InetSocketAddress addr, BSPMessageBundle<M> bundle)
         throws IOException {
       for (M value : bundle.getMessages()) {
-        managerMap.get(addr).localIncomingMessages.add(value);
+        MANAGER_MAP.get(addr).localQueueForNextIteration.add(value);
         peer.incrementCounter(BSPPeerImpl.PeerCounter.TOTAL_MESSAGES_RECEIVED,
             1L);
       }
     }
-
-    @Override
-    public Iterator<Entry<InetSocketAddress, MessageQueue<M>>> getMessageIterator() {
-      return localOutgoingMessages.entrySet().iterator();
-    }
-
-    @Override
-    public void clearOutgoingQueues() {
-      localOutgoingMessages.clear();
-    }
-
-    @Override
-    public int getNumCurrentMessages() {
-      return localIncomingMessages.size();
-    }
-
-    @Override
-    public void finishSendPhase() throws IOException {
-      // TODO Auto-generated method stub
-
-    }
-
   }
 
   public static class LocalUmbilical implements BSPPeerProtocol {