You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hama.apache.org by ed...@apache.org on 2015/02/03 05:35:39 UTC

svn commit: r1656624 - in /hama/trunk: core/src/main/java/org/apache/hama/bsp/message/ core/src/main/java/org/apache/hama/util/ graph/src/main/java/org/apache/hama/graph/

Author: edwardyoon
Date: Tue Feb  3 04:35:39 2015
New Revision: 1656624

URL: http://svn.apache.org/r1656624
Log:
Minor refactor

Added:
    hama/trunk/core/src/main/java/org/apache/hama/bsp/message/AbstractOutgoingMessageManager.java
Modified:
    hama/trunk/core/src/main/java/org/apache/hama/bsp/message/OutgoingPOJOMessageBundle.java
    hama/trunk/core/src/main/java/org/apache/hama/util/BSPNetUtils.java
    hama/trunk/graph/src/main/java/org/apache/hama/graph/OutgoingVertexMessageManager.java

Added: hama/trunk/core/src/main/java/org/apache/hama/bsp/message/AbstractOutgoingMessageManager.java
URL: http://svn.apache.org/viewvc/hama/trunk/core/src/main/java/org/apache/hama/bsp/message/AbstractOutgoingMessageManager.java?rev=1656624&view=auto
==============================================================================
--- hama/trunk/core/src/main/java/org/apache/hama/bsp/message/AbstractOutgoingMessageManager.java (added)
+++ hama/trunk/core/src/main/java/org/apache/hama/bsp/message/AbstractOutgoingMessageManager.java Tue Feb  3 04:35:39 2015
@@ -0,0 +1,58 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hama.bsp.message;
+
+import java.net.InetSocketAddress;
+import java.util.HashMap;
+
+import org.apache.hadoop.io.Writable;
+import org.apache.hama.HamaConfiguration;
+import org.apache.hama.bsp.BSPMessageBundle;
+import org.apache.hama.bsp.message.compress.BSPMessageCompressor;
+import org.apache.hama.util.BSPNetUtils;
+
+public abstract class AbstractOutgoingMessageManager<M extends Writable>
+    implements OutgoingMessageManager<M> {
+
+  protected HamaConfiguration conf;
+  protected BSPMessageCompressor<M> compressor;
+  
+  protected final HashMap<String, InetSocketAddress> peerSocketCache = new HashMap<String, InetSocketAddress>();
+  protected HashMap<InetSocketAddress, BSPMessageBundle<M>> outgoingBundles =  new HashMap<InetSocketAddress, BSPMessageBundle<M>>();
+
+  protected InetSocketAddress getSocketAddress(String peerName) {
+    InetSocketAddress targetPeerAddress = null;
+    // Get socket for target peer.
+    if (peerSocketCache.containsKey(peerName)) {
+      targetPeerAddress = peerSocketCache.get(peerName);
+    } else {
+      targetPeerAddress = BSPNetUtils.getAddress(peerName);
+      peerSocketCache.put(peerName, targetPeerAddress);
+    }
+
+    if (!outgoingBundles.containsKey(targetPeerAddress)) {
+      BSPMessageBundle<M> bundle = new BSPMessageBundle<M>();
+      if (conf.getBoolean("hama.messenger.runtime.compression", false)) {
+        bundle.setCompressor(compressor,
+            conf.getLong("hama.messenger.compression.threshold", 128));
+      }
+      outgoingBundles.put(targetPeerAddress, bundle);
+    }
+    return targetPeerAddress;
+  }
+}

Modified: hama/trunk/core/src/main/java/org/apache/hama/bsp/message/OutgoingPOJOMessageBundle.java
URL: http://svn.apache.org/viewvc/hama/trunk/core/src/main/java/org/apache/hama/bsp/message/OutgoingPOJOMessageBundle.java?rev=1656624&r1=1656623&r2=1656624&view=diff
==============================================================================
--- hama/trunk/core/src/main/java/org/apache/hama/bsp/message/OutgoingPOJOMessageBundle.java (original)
+++ hama/trunk/core/src/main/java/org/apache/hama/bsp/message/OutgoingPOJOMessageBundle.java Tue Feb  3 04:35:39 2015
@@ -18,42 +18,32 @@
 package org.apache.hama.bsp.message;
 
 import java.net.InetSocketAddress;
-import java.util.HashMap;
 import java.util.Iterator;
 import java.util.Map.Entry;
 
 import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.util.ReflectionUtils;
 import org.apache.hama.Constants;
 import org.apache.hama.HamaConfiguration;
 import org.apache.hama.bsp.BSPMessageBundle;
 import org.apache.hama.bsp.Combiner;
 import org.apache.hama.bsp.message.compress.BSPMessageCompressor;
-import org.apache.hama.util.BSPNetUtils;
-import org.apache.hama.util.ReflectionUtils;
 
-public class OutgoingPOJOMessageBundle<M extends Writable> implements
-    OutgoingMessageManager<M> {
+public class OutgoingPOJOMessageBundle<M extends Writable> extends
+    AbstractOutgoingMessageManager<M> {
 
-  private HamaConfiguration conf;
-  private BSPMessageCompressor<M> compressor;
   private Combiner<M> combiner;
-  private final HashMap<String, InetSocketAddress> peerSocketCache = new HashMap<String, InetSocketAddress>();
-  private HashMap<InetSocketAddress, BSPMessageBundle<M>> outgoingBundles = new HashMap<InetSocketAddress, BSPMessageBundle<M>>();
 
   @SuppressWarnings("unchecked")
   @Override
   public void init(HamaConfiguration conf, BSPMessageCompressor<M> compressor) {
     this.conf = conf;
     this.compressor = compressor;
+
     final String combinerName = conf.get(Constants.COMBINER_CLASS);
     if (combinerName != null) {
-      try {
-        this.combiner = (Combiner<M>) ReflectionUtils.newInstance(conf
-            .getClassByName(combinerName));
-      } catch (ClassNotFoundException e) {
-        // TODO Auto-generated catch block
-        e.printStackTrace();
-      }
+      this.combiner = (Combiner<M>) ReflectionUtils.newInstance(
+          conf.getClass(combinerName, Combiner.class), conf);
     }
   }
 
@@ -74,27 +64,6 @@ public class OutgoingPOJOMessageBundle<M
     }
   }
 
-  private InetSocketAddress getSocketAddress(String peerName) {
-    InetSocketAddress targetPeerAddress = null;
-    // Get socket for target peer.
-    if (peerSocketCache.containsKey(peerName)) {
-      targetPeerAddress = peerSocketCache.get(peerName);
-    } else {
-      targetPeerAddress = BSPNetUtils.getAddress(peerName);
-      peerSocketCache.put(peerName, targetPeerAddress);
-    }
-
-    if (!outgoingBundles.containsKey(targetPeerAddress)) {
-      BSPMessageBundle<M> bundle = new BSPMessageBundle<M>();
-      if (conf.getBoolean("hama.messenger.runtime.compression", false)) {
-        bundle.setCompressor(compressor,
-            conf.getLong("hama.messenger.compression.threshold", 128));
-      }
-      outgoingBundles.put(targetPeerAddress, bundle);
-    }
-    return targetPeerAddress;
-  }
-
   @Override
   public void clear() {
     outgoingBundles.clear();

Modified: hama/trunk/core/src/main/java/org/apache/hama/util/BSPNetUtils.java
URL: http://svn.apache.org/viewvc/hama/trunk/core/src/main/java/org/apache/hama/util/BSPNetUtils.java?rev=1656624&r1=1656623&r2=1656624&view=diff
==============================================================================
--- hama/trunk/core/src/main/java/org/apache/hama/util/BSPNetUtils.java (original)
+++ hama/trunk/core/src/main/java/org/apache/hama/util/BSPNetUtils.java Tue Feb  3 04:35:39 2015
@@ -437,7 +437,6 @@ public class BSPNetUtils {
    * @return InputStream for reading from the socket.
    * @throws IOException
    */
-  @SuppressWarnings("resource")
   public static InputStream getInputStream(Socket socket, long timeout)
       throws IOException {
     return (socket.getChannel() == null) ? socket.getInputStream()
@@ -491,7 +490,6 @@ public class BSPNetUtils {
    * @return OutputStream for writing to the socket.
    * @throws IOException
    */
-  @SuppressWarnings("resource")
   public static OutputStream getOutputStream(Socket socket, long timeout)
       throws IOException {
     return (socket.getChannel() == null) ? socket.getOutputStream()

Modified: hama/trunk/graph/src/main/java/org/apache/hama/graph/OutgoingVertexMessageManager.java
URL: http://svn.apache.org/viewvc/hama/trunk/graph/src/main/java/org/apache/hama/graph/OutgoingVertexMessageManager.java?rev=1656624&r1=1656623&r2=1656624&view=diff
==============================================================================
--- hama/trunk/graph/src/main/java/org/apache/hama/graph/OutgoingVertexMessageManager.java (original)
+++ hama/trunk/graph/src/main/java/org/apache/hama/graph/OutgoingVertexMessageManager.java Tue Feb  3 04:35:39 2015
@@ -30,20 +30,15 @@ import org.apache.hama.Constants;
 import org.apache.hama.HamaConfiguration;
 import org.apache.hama.bsp.BSPMessageBundle;
 import org.apache.hama.bsp.Combiner;
-import org.apache.hama.bsp.message.OutgoingMessageManager;
+import org.apache.hama.bsp.message.AbstractOutgoingMessageManager;
 import org.apache.hama.bsp.message.compress.BSPMessageCompressor;
-import org.apache.hama.util.BSPNetUtils;
 
-public class OutgoingVertexMessageManager<M extends Writable> implements
-    OutgoingMessageManager<GraphJobMessage> {
+public class OutgoingVertexMessageManager<M extends Writable> extends
+    AbstractOutgoingMessageManager<GraphJobMessage> {
   protected static final Log LOG = LogFactory
       .getLog(OutgoingVertexMessageManager.class);
 
-  private HamaConfiguration conf;
-  private BSPMessageCompressor<GraphJobMessage> compressor;
   private Combiner<Writable> combiner;
-  private final HashMap<String, InetSocketAddress> peerSocketCache = new HashMap<String, InetSocketAddress>();
-  private HashMap<InetSocketAddress, BSPMessageBundle<GraphJobMessage>> outgoingBundles = new HashMap<InetSocketAddress, BSPMessageBundle<GraphJobMessage>>();
 
   private HashMap<InetSocketAddress, MessagePerVertex> storage = new HashMap<InetSocketAddress, MessagePerVertex>();
 
@@ -53,9 +48,9 @@ public class OutgoingVertexMessageManage
       BSPMessageCompressor<GraphJobMessage> compressor) {
     this.conf = conf;
     this.compressor = compressor;
+
     if (!conf.getClass(Constants.COMBINER_CLASS, Combiner.class).equals(
         Combiner.class)) {
-      LOG.debug("Combiner class: " + conf.get(Constants.COMBINER_CLASS));
 
       combiner = (Combiner<Writable>) org.apache.hadoop.util.ReflectionUtils
           .newInstance(conf.getClass(Constants.COMBINER_CLASS, Combiner.class),
@@ -67,7 +62,6 @@ public class OutgoingVertexMessageManage
   @Override
   public void addMessage(String peerName, GraphJobMessage msg) {
     InetSocketAddress targetPeerAddress = getSocketAddress(peerName);
-
     if (msg.isVertexMessage()) {
       WritableComparable vertexID = msg.getVertexId();
 
@@ -86,33 +80,11 @@ public class OutgoingVertexMessageManage
             new GraphJobMessage(vertexID, combiner.combine(msgPerVertex.get(
                 vertexID).getVertexValue())));
       }
-
     } else {
       outgoingBundles.get(targetPeerAddress).addMessage(msg);
     }
   }
 
-  private InetSocketAddress getSocketAddress(String peerName) {
-    InetSocketAddress targetPeerAddress = null;
-    // Get socket for target peer.
-    if (peerSocketCache.containsKey(peerName)) {
-      targetPeerAddress = peerSocketCache.get(peerName);
-    } else {
-      targetPeerAddress = BSPNetUtils.getAddress(peerName);
-      peerSocketCache.put(peerName, targetPeerAddress);
-    }
-
-    if (!outgoingBundles.containsKey(targetPeerAddress)) {
-      BSPMessageBundle<GraphJobMessage> bundle = new BSPMessageBundle<GraphJobMessage>();
-      if (conf.getBoolean("hama.messenger.runtime.compression", false)) {
-        bundle.setCompressor(compressor,
-            conf.getLong("hama.messenger.compression.threshold", 128));
-      }
-      outgoingBundles.put(targetPeerAddress, bundle);
-    }
-    return targetPeerAddress;
-  }
-
   @Override
   public void clear() {
     outgoingBundles.clear();
@@ -153,5 +125,4 @@ public class OutgoingVertexMessageManage
 
     };
   }
-
 }