You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hama.apache.org by ed...@apache.org on 2015/02/03 05:35:39 UTC
svn commit: r1656624 - in /hama/trunk:
core/src/main/java/org/apache/hama/bsp/message/
core/src/main/java/org/apache/hama/util/
graph/src/main/java/org/apache/hama/graph/
Author: edwardyoon
Date: Tue Feb 3 04:35:39 2015
New Revision: 1656624
URL: http://svn.apache.org/r1656624
Log:
Minor refactor
Added:
hama/trunk/core/src/main/java/org/apache/hama/bsp/message/AbstractOutgoingMessageManager.java
Modified:
hama/trunk/core/src/main/java/org/apache/hama/bsp/message/OutgoingPOJOMessageBundle.java
hama/trunk/core/src/main/java/org/apache/hama/util/BSPNetUtils.java
hama/trunk/graph/src/main/java/org/apache/hama/graph/OutgoingVertexMessageManager.java
Added: hama/trunk/core/src/main/java/org/apache/hama/bsp/message/AbstractOutgoingMessageManager.java
URL: http://svn.apache.org/viewvc/hama/trunk/core/src/main/java/org/apache/hama/bsp/message/AbstractOutgoingMessageManager.java?rev=1656624&view=auto
==============================================================================
--- hama/trunk/core/src/main/java/org/apache/hama/bsp/message/AbstractOutgoingMessageManager.java (added)
+++ hama/trunk/core/src/main/java/org/apache/hama/bsp/message/AbstractOutgoingMessageManager.java Tue Feb 3 04:35:39 2015
@@ -0,0 +1,58 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hama.bsp.message;
+
+import java.net.InetSocketAddress;
+import java.util.HashMap;
+
+import org.apache.hadoop.io.Writable;
+import org.apache.hama.HamaConfiguration;
+import org.apache.hama.bsp.BSPMessageBundle;
+import org.apache.hama.bsp.message.compress.BSPMessageCompressor;
+import org.apache.hama.util.BSPNetUtils;
+
+public abstract class AbstractOutgoingMessageManager<M extends Writable>
+ implements OutgoingMessageManager<M> {
+
+ protected HamaConfiguration conf;
+ protected BSPMessageCompressor<M> compressor;
+
+ protected final HashMap<String, InetSocketAddress> peerSocketCache = new HashMap<String, InetSocketAddress>();
+ protected HashMap<InetSocketAddress, BSPMessageBundle<M>> outgoingBundles = new HashMap<InetSocketAddress, BSPMessageBundle<M>>();
+
+ protected InetSocketAddress getSocketAddress(String peerName) {
+ InetSocketAddress targetPeerAddress = null;
+ // Get socket for target peer.
+ if (peerSocketCache.containsKey(peerName)) {
+ targetPeerAddress = peerSocketCache.get(peerName);
+ } else {
+ targetPeerAddress = BSPNetUtils.getAddress(peerName);
+ peerSocketCache.put(peerName, targetPeerAddress);
+ }
+
+ if (!outgoingBundles.containsKey(targetPeerAddress)) {
+ BSPMessageBundle<M> bundle = new BSPMessageBundle<M>();
+ if (conf.getBoolean("hama.messenger.runtime.compression", false)) {
+ bundle.setCompressor(compressor,
+ conf.getLong("hama.messenger.compression.threshold", 128));
+ }
+ outgoingBundles.put(targetPeerAddress, bundle);
+ }
+ return targetPeerAddress;
+ }
+}
Modified: hama/trunk/core/src/main/java/org/apache/hama/bsp/message/OutgoingPOJOMessageBundle.java
URL: http://svn.apache.org/viewvc/hama/trunk/core/src/main/java/org/apache/hama/bsp/message/OutgoingPOJOMessageBundle.java?rev=1656624&r1=1656623&r2=1656624&view=diff
==============================================================================
--- hama/trunk/core/src/main/java/org/apache/hama/bsp/message/OutgoingPOJOMessageBundle.java (original)
+++ hama/trunk/core/src/main/java/org/apache/hama/bsp/message/OutgoingPOJOMessageBundle.java Tue Feb 3 04:35:39 2015
@@ -18,42 +18,32 @@
package org.apache.hama.bsp.message;
import java.net.InetSocketAddress;
-import java.util.HashMap;
import java.util.Iterator;
import java.util.Map.Entry;
import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.util.ReflectionUtils;
import org.apache.hama.Constants;
import org.apache.hama.HamaConfiguration;
import org.apache.hama.bsp.BSPMessageBundle;
import org.apache.hama.bsp.Combiner;
import org.apache.hama.bsp.message.compress.BSPMessageCompressor;
-import org.apache.hama.util.BSPNetUtils;
-import org.apache.hama.util.ReflectionUtils;
-public class OutgoingPOJOMessageBundle<M extends Writable> implements
- OutgoingMessageManager<M> {
+public class OutgoingPOJOMessageBundle<M extends Writable> extends
+ AbstractOutgoingMessageManager<M> {
- private HamaConfiguration conf;
- private BSPMessageCompressor<M> compressor;
private Combiner<M> combiner;
- private final HashMap<String, InetSocketAddress> peerSocketCache = new HashMap<String, InetSocketAddress>();
- private HashMap<InetSocketAddress, BSPMessageBundle<M>> outgoingBundles = new HashMap<InetSocketAddress, BSPMessageBundle<M>>();
@SuppressWarnings("unchecked")
@Override
public void init(HamaConfiguration conf, BSPMessageCompressor<M> compressor) {
this.conf = conf;
this.compressor = compressor;
+
final String combinerName = conf.get(Constants.COMBINER_CLASS);
if (combinerName != null) {
- try {
- this.combiner = (Combiner<M>) ReflectionUtils.newInstance(conf
- .getClassByName(combinerName));
- } catch (ClassNotFoundException e) {
- // TODO Auto-generated catch block
- e.printStackTrace();
- }
+ this.combiner = (Combiner<M>) ReflectionUtils.newInstance(
+ conf.getClass(combinerName, Combiner.class), conf);
}
}
@@ -74,27 +64,6 @@ public class OutgoingPOJOMessageBundle<M
}
}
- private InetSocketAddress getSocketAddress(String peerName) {
- InetSocketAddress targetPeerAddress = null;
- // Get socket for target peer.
- if (peerSocketCache.containsKey(peerName)) {
- targetPeerAddress = peerSocketCache.get(peerName);
- } else {
- targetPeerAddress = BSPNetUtils.getAddress(peerName);
- peerSocketCache.put(peerName, targetPeerAddress);
- }
-
- if (!outgoingBundles.containsKey(targetPeerAddress)) {
- BSPMessageBundle<M> bundle = new BSPMessageBundle<M>();
- if (conf.getBoolean("hama.messenger.runtime.compression", false)) {
- bundle.setCompressor(compressor,
- conf.getLong("hama.messenger.compression.threshold", 128));
- }
- outgoingBundles.put(targetPeerAddress, bundle);
- }
- return targetPeerAddress;
- }
-
@Override
public void clear() {
outgoingBundles.clear();
Modified: hama/trunk/core/src/main/java/org/apache/hama/util/BSPNetUtils.java
URL: http://svn.apache.org/viewvc/hama/trunk/core/src/main/java/org/apache/hama/util/BSPNetUtils.java?rev=1656624&r1=1656623&r2=1656624&view=diff
==============================================================================
--- hama/trunk/core/src/main/java/org/apache/hama/util/BSPNetUtils.java (original)
+++ hama/trunk/core/src/main/java/org/apache/hama/util/BSPNetUtils.java Tue Feb 3 04:35:39 2015
@@ -437,7 +437,6 @@ public class BSPNetUtils {
* @return InputStream for reading from the socket.
* @throws IOException
*/
- @SuppressWarnings("resource")
public static InputStream getInputStream(Socket socket, long timeout)
throws IOException {
return (socket.getChannel() == null) ? socket.getInputStream()
@@ -491,7 +490,6 @@ public class BSPNetUtils {
* @return OutputStream for writing to the socket.
* @throws IOException
*/
- @SuppressWarnings("resource")
public static OutputStream getOutputStream(Socket socket, long timeout)
throws IOException {
return (socket.getChannel() == null) ? socket.getOutputStream()
Modified: hama/trunk/graph/src/main/java/org/apache/hama/graph/OutgoingVertexMessageManager.java
URL: http://svn.apache.org/viewvc/hama/trunk/graph/src/main/java/org/apache/hama/graph/OutgoingVertexMessageManager.java?rev=1656624&r1=1656623&r2=1656624&view=diff
==============================================================================
--- hama/trunk/graph/src/main/java/org/apache/hama/graph/OutgoingVertexMessageManager.java (original)
+++ hama/trunk/graph/src/main/java/org/apache/hama/graph/OutgoingVertexMessageManager.java Tue Feb 3 04:35:39 2015
@@ -30,20 +30,15 @@ import org.apache.hama.Constants;
import org.apache.hama.HamaConfiguration;
import org.apache.hama.bsp.BSPMessageBundle;
import org.apache.hama.bsp.Combiner;
-import org.apache.hama.bsp.message.OutgoingMessageManager;
+import org.apache.hama.bsp.message.AbstractOutgoingMessageManager;
import org.apache.hama.bsp.message.compress.BSPMessageCompressor;
-import org.apache.hama.util.BSPNetUtils;
-public class OutgoingVertexMessageManager<M extends Writable> implements
- OutgoingMessageManager<GraphJobMessage> {
+public class OutgoingVertexMessageManager<M extends Writable> extends
+ AbstractOutgoingMessageManager<GraphJobMessage> {
protected static final Log LOG = LogFactory
.getLog(OutgoingVertexMessageManager.class);
- private HamaConfiguration conf;
- private BSPMessageCompressor<GraphJobMessage> compressor;
private Combiner<Writable> combiner;
- private final HashMap<String, InetSocketAddress> peerSocketCache = new HashMap<String, InetSocketAddress>();
- private HashMap<InetSocketAddress, BSPMessageBundle<GraphJobMessage>> outgoingBundles = new HashMap<InetSocketAddress, BSPMessageBundle<GraphJobMessage>>();
private HashMap<InetSocketAddress, MessagePerVertex> storage = new HashMap<InetSocketAddress, MessagePerVertex>();
@@ -53,9 +48,9 @@ public class OutgoingVertexMessageManage
BSPMessageCompressor<GraphJobMessage> compressor) {
this.conf = conf;
this.compressor = compressor;
+
if (!conf.getClass(Constants.COMBINER_CLASS, Combiner.class).equals(
Combiner.class)) {
- LOG.debug("Combiner class: " + conf.get(Constants.COMBINER_CLASS));
combiner = (Combiner<Writable>) org.apache.hadoop.util.ReflectionUtils
.newInstance(conf.getClass(Constants.COMBINER_CLASS, Combiner.class),
@@ -67,7 +62,6 @@ public class OutgoingVertexMessageManage
@Override
public void addMessage(String peerName, GraphJobMessage msg) {
InetSocketAddress targetPeerAddress = getSocketAddress(peerName);
-
if (msg.isVertexMessage()) {
WritableComparable vertexID = msg.getVertexId();
@@ -86,33 +80,11 @@ public class OutgoingVertexMessageManage
new GraphJobMessage(vertexID, combiner.combine(msgPerVertex.get(
vertexID).getVertexValue())));
}
-
} else {
outgoingBundles.get(targetPeerAddress).addMessage(msg);
}
}
- private InetSocketAddress getSocketAddress(String peerName) {
- InetSocketAddress targetPeerAddress = null;
- // Get socket for target peer.
- if (peerSocketCache.containsKey(peerName)) {
- targetPeerAddress = peerSocketCache.get(peerName);
- } else {
- targetPeerAddress = BSPNetUtils.getAddress(peerName);
- peerSocketCache.put(peerName, targetPeerAddress);
- }
-
- if (!outgoingBundles.containsKey(targetPeerAddress)) {
- BSPMessageBundle<GraphJobMessage> bundle = new BSPMessageBundle<GraphJobMessage>();
- if (conf.getBoolean("hama.messenger.runtime.compression", false)) {
- bundle.setCompressor(compressor,
- conf.getLong("hama.messenger.compression.threshold", 128));
- }
- outgoingBundles.put(targetPeerAddress, bundle);
- }
- return targetPeerAddress;
- }
-
@Override
public void clear() {
outgoingBundles.clear();
@@ -153,5 +125,4 @@ public class OutgoingVertexMessageManage
};
}
-
}