You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hama.apache.org by ed...@apache.org on 2013/04/09 03:28:06 UTC

svn commit: r1465852 [15/15] - in /hama/trunk: ./ bin/ c++/ c++/pipes/ c++/pipes/api/ c++/pipes/api/hama/ c++/pipes/debug/ c++/pipes/impl/ c++/utils/ c++/utils/api/ c++/utils/api/hadoop/ c++/utils/impl/ c++/utils/m4/ core/src/main/java/org/apache/hama/...

Added: hama/trunk/core/src/main/java/org/apache/hama/pipes/protocol/UplinkReader.java
URL: http://svn.apache.org/viewvc/hama/trunk/core/src/main/java/org/apache/hama/pipes/protocol/UplinkReader.java?rev=1465852&view=auto
==============================================================================
--- hama/trunk/core/src/main/java/org/apache/hama/pipes/protocol/UplinkReader.java (added)
+++ hama/trunk/core/src/main/java/org/apache/hama/pipes/protocol/UplinkReader.java Tue Apr  9 01:28:04 2013
@@ -0,0 +1,516 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hama.pipes.protocol;
+
+import java.io.BufferedInputStream;
+import java.io.DataInputStream;
+import java.io.DataOutputStream;
+import java.io.IOException;
+import java.io.InputStream;
+import java.util.AbstractMap;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Map.Entry;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.BytesWritable;
+import org.apache.hadoop.io.NullWritable;
+import org.apache.hadoop.io.SequenceFile;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.io.WritableUtils;
+import org.apache.hadoop.util.ReflectionUtils;
+import org.apache.hadoop.util.StringUtils;
+import org.apache.hama.Constants;
+import org.apache.hama.bsp.BSPPeer;
+import org.apache.hama.bsp.sync.SyncException;
+import org.apache.hama.util.KeyValuePair;
+
+public class UplinkReader<K1 extends Writable, V1 extends Writable, K2 extends Writable, V2 extends Writable>
+    extends Thread {
+
+  private static final Log LOG = LogFactory.getLog(UplinkReader.class);
+
+  protected DataInputStream inStream;
+  private K2 key;
+  private V2 value;
+  
+  private BinaryProtocol<K1, V1, K2, V2> binProtocol;
+  private BSPPeer<K1, V1, K2, V2, BytesWritable> peer = null;
+  private Configuration conf;
+  
+  private Map<Integer, Entry<SequenceFile.Reader, Entry<String, String>>> sequenceFileReaders;
+  private Map<Integer, Entry<SequenceFile.Writer, Entry<String, String>>> sequenceFileWriters;
+
+  @SuppressWarnings("unchecked")
+  public UplinkReader(BinaryProtocol<K1, V1, K2, V2> binaryProtocol,
+      Configuration conf, InputStream stream) throws IOException {
+
+    this.binProtocol = binaryProtocol;
+    this.conf = conf;
+
+    this.inStream = new DataInputStream(new BufferedInputStream(stream,
+        BinaryProtocol.BUFFER_SIZE));
+
+    this.key = (K2) ReflectionUtils.newInstance((Class<? extends K2>) conf
+        .getClass("bsp.output.key.class", Object.class), conf);
+
+    this.value = (V2) ReflectionUtils.newInstance((Class<? extends V2>) conf
+        .getClass("bsp.output.value.class", Object.class), conf);
+
+    this.sequenceFileReaders = new HashMap<Integer, Entry<SequenceFile.Reader, Entry<String, String>>>();
+    this.sequenceFileWriters = new HashMap<Integer, Entry<SequenceFile.Writer, Entry<String, String>>>();
+  }
+
+  public UplinkReader(BinaryProtocol<K1, V1, K2, V2> binaryProtocol,
+      BSPPeer<K1, V1, K2, V2, BytesWritable> peer, InputStream stream)
+      throws IOException {
+    this(binaryProtocol, peer.getConfiguration(), stream);
+    this.peer = peer;
+  }
+
+  private boolean isPeerAvailable() {
+    return this.peer != null;
+  }
+
+  public void closeConnection() throws IOException {
+    inStream.close();
+  }
+
+  @Override
+  public void run() {
+    while (true) {
+      try {
+        if (Thread.currentThread().isInterrupted()) {
+          throw new InterruptedException();
+        }
+
+        int cmd = readCommand();
+        if (cmd == -1)
+          continue;
+        LOG.debug("Handling uplink command " + cmd);
+
+        if (cmd == MessageType.WRITE_KEYVALUE.code && isPeerAvailable()) { // INCOMING
+          writeKeyValue();
+        } else if (cmd == MessageType.READ_KEYVALUE.code && isPeerAvailable()) { // OUTGOING
+          readKeyValue();
+        } else if (cmd == MessageType.INCREMENT_COUNTER.code
+            && isPeerAvailable()) { // INCOMING
+          incrementCounter();
+        } else if (cmd == MessageType.REGISTER_COUNTER.code
+            && isPeerAvailable()) { // INCOMING
+          /*
+           * Is not used in HAMA -> Hadoop Pipes - maybe for performance, skip
+           * transferring group and name each INCREMENT
+           */
+        } else if (cmd == MessageType.TASK_DONE.code) { // INCOMING
+          synchronized (binProtocol.hasTaskLock) {
+            binProtocol.setHasTask(false);
+            LOG.debug("Got MessageType.TASK_DONE");
+            binProtocol.hasTaskLock.notify();
+          }
+        } else if (cmd == MessageType.DONE.code) { // INCOMING
+          LOG.debug("Pipe child done");
+          return;
+        } else if (cmd == MessageType.SEND_MSG.code && isPeerAvailable()) { // INCOMING
+          sendMessage();
+        } else if (cmd == MessageType.GET_MSG_COUNT.code && isPeerAvailable()) { // OUTGOING
+          getMessageCount();
+        } else if (cmd == MessageType.GET_MSG.code && isPeerAvailable()) { // OUTGOING
+          getMessage();
+        } else if (cmd == MessageType.SYNC.code && isPeerAvailable()) { // INCOMING
+          sync();
+        } else if (cmd == MessageType.GET_ALL_PEERNAME.code
+            && isPeerAvailable()) { // OUTGOING
+          getAllPeerNames();
+        } else if (cmd == MessageType.GET_PEERNAME.code && isPeerAvailable()) { // OUTGOING
+          getPeerName();
+        } else if (cmd == MessageType.GET_PEER_INDEX.code && isPeerAvailable()) { // OUTGOING
+          getPeerIndex();
+        } else if (cmd == MessageType.GET_PEER_COUNT.code && isPeerAvailable()) { // OUTGOING
+          getPeerCount();
+        } else if (cmd == MessageType.GET_SUPERSTEP_COUNT.code
+            && isPeerAvailable()) { // OUTGOING
+          getSuperstepCount();
+        } else if (cmd == MessageType.REOPEN_INPUT.code && isPeerAvailable()) { // INCOMING
+          reopenInput();
+        } else if (cmd == MessageType.CLEAR.code && isPeerAvailable()) { // INCOMING
+          LOG.debug("Got MessageType.CLEAR");
+          peer.clear();
+          /* SequenceFileConnector Implementation */
+        } else if (cmd == MessageType.SEQFILE_OPEN.code) { // OUTGOING
+          seqFileOpen();
+        } else if (cmd == MessageType.SEQFILE_READNEXT.code) { // OUTGOING
+          seqFileReadNext();
+        } else if (cmd == MessageType.SEQFILE_APPEND.code) { // INCOMING
+          seqFileAppend();
+        } else if (cmd == MessageType.SEQFILE_CLOSE.code) { // OUTGOING
+          seqFileClose();
+          /* SequenceFileConnector Implementation */
+        } else if (cmd == MessageType.PARTITION_RESPONSE.code) { // INCOMING
+          partitionResponse();
+        } else {
+          throw new IOException("Bad command code: " + cmd);
+        }
+
+      } catch (InterruptedException e) {
+        return;
+      } catch (Throwable e) {
+        onError(e);
+        throw new RuntimeException(e);
+      }
+    }
+  }
+
+  protected void onError(Throwable e) {
+    LOG.error(StringUtils.stringifyException(e));
+  }
+
+  public int readCommand() throws IOException {
+    return WritableUtils.readVInt(inStream);
+  }
+
+  public void reopenInput() throws IOException {
+    LOG.debug("Got MessageType.REOPEN_INPUT");
+    peer.reopenInput();
+  }
+
+  public void getSuperstepCount() throws IOException {
+    DataOutputStream stream = binProtocol.getStream();
+    WritableUtils.writeVInt(stream, MessageType.GET_SUPERSTEP_COUNT.code);
+    WritableUtils.writeVLong(stream, peer.getSuperstepCount());
+    binProtocol.flush();
+
+    LOG.debug("Responded MessageType.GET_SUPERSTEP_COUNT - SuperstepCount: "
+        + peer.getSuperstepCount());
+  }
+
+  public void getPeerCount() throws IOException {
+    DataOutputStream stream = binProtocol.getStream();
+    WritableUtils.writeVInt(stream, MessageType.GET_PEER_COUNT.code);
+    WritableUtils.writeVInt(stream, peer.getNumPeers());
+    binProtocol.flush();
+    LOG.debug("Responded MessageType.GET_PEER_COUNT - NumPeers: "
+        + peer.getNumPeers());
+  }
+
+  public void getPeerIndex() throws IOException {
+    DataOutputStream stream = binProtocol.getStream();
+    WritableUtils.writeVInt(stream, MessageType.GET_PEER_INDEX.code);
+    WritableUtils.writeVInt(stream, peer.getPeerIndex());
+    binProtocol.flush();
+    LOG.debug("Responded MessageType.GET_PEER_INDEX - PeerIndex: "
+        + peer.getPeerIndex());
+  }
+
+  public void getPeerName() throws IOException {
+    DataOutputStream stream = binProtocol.getStream();
+    int id = WritableUtils.readVInt(inStream);
+    LOG.debug("Got MessageType.GET_PEERNAME id: " + id);
+
+    WritableUtils.writeVInt(stream, MessageType.GET_PEERNAME.code);
+    if (id == -1) { // -1 indicates get own PeerName
+      Text.writeString(stream, peer.getPeerName());
+      LOG.debug("Responded MessageType.GET_PEERNAME - Get Own PeerName: "
+          + peer.getPeerName());
+
+    } else if ((id < -1) || (id >= peer.getNumPeers())) {
+      // if no PeerName for this index is found write emptyString
+      Text.writeString(stream, "");
+      LOG.debug("Responded MessageType.GET_PEERNAME - Empty PeerName!");
+
+    } else {
+      Text.writeString(stream, peer.getPeerName(id));
+      LOG.debug("Responded MessageType.GET_PEERNAME - PeerName: "
+          + peer.getPeerName(id));
+    }
+    binProtocol.flush();
+  }
+
+  public void getAllPeerNames() throws IOException {
+    DataOutputStream stream = binProtocol.getStream();
+    LOG.debug("Got MessageType.GET_ALL_PEERNAME");
+    WritableUtils.writeVInt(stream, MessageType.GET_ALL_PEERNAME.code);
+    WritableUtils.writeVInt(stream, peer.getAllPeerNames().length);
+    for (String s : peer.getAllPeerNames())
+      Text.writeString(stream, s);
+
+    binProtocol.flush();
+    LOG.debug("Responded MessageType.GET_ALL_PEERNAME - peerNamesCount: "
+        + peer.getAllPeerNames().length);
+  }
+
+  public void sync() throws IOException, SyncException, InterruptedException {
+    LOG.debug("Got MessageType.SYNC");
+    peer.sync(); // this call blocks
+  }
+
+  public void getMessage() throws IOException {
+    DataOutputStream stream = binProtocol.getStream();
+    LOG.debug("Got MessageType.GET_MSG");
+    WritableUtils.writeVInt(stream, MessageType.GET_MSG.code);
+    BytesWritable msg = peer.getCurrentMessage();
+    if (msg != null)
+      binProtocol.writeObject(msg);
+
+    binProtocol.flush();
+    LOG.debug("Responded MessageType.GET_MSG - Message(BytesWritable) ");// +msg);
+  }
+
+  public void getMessageCount() throws IOException {
+    DataOutputStream stream = binProtocol.getStream();
+    WritableUtils.writeVInt(stream, MessageType.GET_MSG_COUNT.code);
+    WritableUtils.writeVInt(stream, peer.getNumCurrentMessages());
+    binProtocol.flush();
+    LOG.debug("Responded MessageType.GET_MSG_COUNT - Count: "
+        + peer.getNumCurrentMessages());
+  }
+
+  public void sendMessage() throws IOException {
+    String peerName = Text.readString(inStream);
+    BytesWritable msg = new BytesWritable();
+    readObject(msg);
+    LOG.debug("Got MessageType.SEND_MSG to peerName: " + peerName);
+    peer.send(peerName, msg);
+  }
+
+  public void incrementCounter() throws IOException {
+    // int id = WritableUtils.readVInt(inStream);
+    String group = Text.readString(inStream);
+    String name = Text.readString(inStream);
+    long amount = WritableUtils.readVLong(inStream);
+    peer.incrementCounter(name, group, amount);
+  }
+
+  public void readKeyValue() throws IOException {
+    DataOutputStream stream = binProtocol.getStream();
+    boolean nullinput = peer.getConfiguration().get(
+        Constants.INPUT_FORMAT_CLASS) == null
+        || peer.getConfiguration().get(Constants.INPUT_FORMAT_CLASS)
+            .equals("org.apache.hama.bsp.NullInputFormat");
+
+    if (!nullinput) {
+
+      KeyValuePair<K1, V1> pair = peer.readNext();
+
+      WritableUtils.writeVInt(stream, MessageType.READ_KEYVALUE.code);
+      if (pair != null) {
+        binProtocol.writeObject(pair.getKey());
+        binProtocol.writeObject(pair.getValue());
+
+        String valueStr = pair.getValue().toString();
+        LOG.debug("Responded MessageType.READ_KEYVALUE - Key: "
+            + pair.getKey()
+            + " Value: "
+            + ((valueStr.length() < 10) ? valueStr : valueStr.substring(0, 9)
+                + "..."));
+
+      } else {
+        Text.writeString(stream, "");
+        Text.writeString(stream, "");
+        LOG.debug("Responded MessageType.READ_KEYVALUE - EMPTY KeyValue Pair");
+      }
+      binProtocol.flush();
+
+    } else {
+      /* TODO */
+      /* Send empty Strings to show no KeyValue pair is available */
+      WritableUtils.writeVInt(stream, MessageType.READ_KEYVALUE.code);
+      Text.writeString(stream, "");
+      Text.writeString(stream, "");
+      binProtocol.flush();
+      LOG.debug("Responded MessageType.READ_KEYVALUE - EMPTY KeyValue Pair");
+    }
+  }
+
+  public void writeKeyValue() throws IOException {
+    readObject(key); // string or binary only
+    readObject(value); // string or binary only
+    if (LOG.isDebugEnabled())
+      LOG.debug("Got MessageType.WRITE_KEYVALUE - Key: " + key + " Value: "
+          + value);
+    peer.write(key, value);
+  }
+
+  public void seqFileOpen() throws IOException {
+    String path = Text.readString(inStream);
+    // option - read = "r" or write = "w"
+    String option = Text.readString(inStream);
+    // key and value Type stored in the SequenceFile
+    String keyType = Text.readString(inStream);
+    String valueType = Text.readString(inStream);
+
+    int fileID = -1;
+
+    FileSystem fs = FileSystem.get(conf);
+    if (option.equals("r")) {
+      SequenceFile.Reader reader;
+      try {
+        reader = new SequenceFile.Reader(fs, new Path(path), conf);
+        fileID = reader.hashCode();
+        sequenceFileReaders
+            .put(
+                fileID,
+                new AbstractMap.SimpleEntry<SequenceFile.Reader, Entry<String, String>>(
+                    reader, new AbstractMap.SimpleEntry<String, String>(
+                        keyType, valueType)));
+      } catch (IOException e) {
+        fileID = -1;
+      }
+
+    } else if (option.equals("w")) {
+      SequenceFile.Writer writer;
+      try {
+        writer = new SequenceFile.Writer(fs, conf, new Path(path), Text.class,
+            Text.class);
+        fileID = writer.hashCode();
+        sequenceFileWriters
+            .put(
+                fileID,
+                new AbstractMap.SimpleEntry<SequenceFile.Writer, Entry<String, String>>(
+                    writer, new AbstractMap.SimpleEntry<String, String>(
+                        keyType, valueType)));
+      } catch (IOException e) {
+        fileID = -1;
+      }
+    }
+
+    DataOutputStream stream = binProtocol.getStream();
+    WritableUtils.writeVInt(stream, MessageType.SEQFILE_OPEN.code);
+    WritableUtils.writeVInt(stream, fileID);
+    binProtocol.flush();
+    LOG.debug("Responded MessageType.SEQFILE_OPEN - FileID: " + fileID);
+  }
+
+  public void seqFileReadNext() throws IOException, ClassNotFoundException {
+    int fileID = WritableUtils.readVInt(inStream);
+    // LOG.debug("GOT MessageType.SEQFILE_READNEXT - FileID: " + fileID);
+
+    Class<?> keyType = conf.getClassLoader().loadClass(
+        sequenceFileReaders.get(fileID).getValue().getKey());
+    Writable key = (Writable) ReflectionUtils.newInstance(keyType, conf);
+
+    Class<?> valueType = conf.getClassLoader().loadClass(
+        sequenceFileReaders.get(fileID).getValue().getValue());
+    Writable value = (Writable) ReflectionUtils.newInstance(valueType, conf);
+
+    if (sequenceFileReaders.containsKey(fileID))
+      sequenceFileReaders.get(fileID).getKey().next(key, value);
+
+    // RESPOND
+    DataOutputStream stream = binProtocol.getStream();
+    WritableUtils.writeVInt(stream, MessageType.SEQFILE_READNEXT.code);
+    try {
+      String k = key.toString();
+      String v = value.toString();
+      Text.writeString(stream, k);
+      Text.writeString(stream, v);
+      LOG.debug("Responded MessageType.SEQFILE_READNEXT - key: " + k
+          + " value: " + ((v.length() < 10) ? v : v.substring(0, 9) + "..."));
+
+    } catch (NullPointerException e) { // key or value is null
+
+      Text.writeString(stream, "");
+      Text.writeString(stream, "");
+      LOG.debug("Responded MessageType.SEQFILE_READNEXT - EMPTY KeyValue Pair");
+    }
+    binProtocol.flush();
+  }
+
+  public void seqFileAppend() throws IOException {
+    int fileID = WritableUtils.readVInt(inStream);
+    String keyStr = Text.readString(inStream);
+    String valueStr = Text.readString(inStream);
+
+    boolean result = false;
+    if (sequenceFileWriters.containsKey(fileID)) {
+      sequenceFileWriters.get(fileID).getKey()
+          .append(new Text(keyStr), new Text(valueStr));
+      result = true;
+    }
+
+    // RESPOND
+    DataOutputStream stream = binProtocol.getStream();
+    WritableUtils.writeVInt(stream, MessageType.SEQFILE_APPEND.code);
+    WritableUtils.writeVInt(stream, result ? 1 : 0);
+    binProtocol.flush();
+    LOG.debug("Responded MessageType.SEQFILE_APPEND - Result: " + result);
+  }
+
+  public void seqFileClose() throws IOException {
+    int fileID = WritableUtils.readVInt(inStream);
+
+    boolean result = false;
+
+    if (sequenceFileReaders.containsKey(fileID)) {
+      sequenceFileReaders.get(fileID).getKey().close();
+      result = true;
+    } else if (sequenceFileWriters.containsKey(fileID)) {
+      sequenceFileWriters.get(fileID).getKey().close();
+      result = true;
+    }
+
+    // RESPOND
+    DataOutputStream stream = binProtocol.getStream();
+    WritableUtils.writeVInt(stream, MessageType.SEQFILE_CLOSE.code);
+    WritableUtils.writeVInt(stream, result ? 1 : 0);
+    binProtocol.flush();
+    LOG.debug("Responded MessageType.SEQFILE_CLOSE - Result: " + result);
+  }
+
+  public void partitionResponse() throws IOException {
+    int partResponse = WritableUtils.readVInt(inStream);
+    synchronized (binProtocol.resultLock) {
+      binProtocol.setResult(partResponse);
+      LOG.debug("Received MessageType.PARTITION_RESPONSE - Result: "
+          + partResponse);
+      binProtocol.resultLock.notify();
+    }
+  }
+
+  protected void readObject(Writable obj) throws IOException {
+    int numBytes = readCommand();
+    byte[] buffer;
+    // For BytesWritable and Text, use the specified length to set the length
+    // this causes the "obvious" translations to work. So that if you emit
+    // a string "abc" from C++, it shows up as "abc".
+    if (obj instanceof BytesWritable) {
+      buffer = new byte[numBytes];
+      inStream.readFully(buffer);
+      ((BytesWritable) obj).set(buffer, 0, numBytes);
+    } else if (obj instanceof Text) {
+      buffer = new byte[numBytes];
+      inStream.readFully(buffer);
+      ((Text) obj).set(buffer);
+    } else if (obj instanceof NullWritable) {
+      throw new IOException(
+          "Cannot read data into NullWritable! Check OutputClasses!");
+    } else {
+      /* TODO */
+      /* IntWritable, DoubleWritable */
+      throw new IOException(
+          "Hama Pipes does only support Text as Key/Value output!");
+      // obj.readFields(inStream);
+    }
+  }
+}

Added: hama/trunk/core/src/main/java/org/apache/hama/pipes/util/DistributedCacheUtil.java
URL: http://svn.apache.org/viewvc/hama/trunk/core/src/main/java/org/apache/hama/pipes/util/DistributedCacheUtil.java?rev=1465852&view=auto
==============================================================================
--- hama/trunk/core/src/main/java/org/apache/hama/pipes/util/DistributedCacheUtil.java (added)
+++ hama/trunk/core/src/main/java/org/apache/hama/pipes/util/DistributedCacheUtil.java Tue Apr  9 01:28:04 2013
@@ -0,0 +1,218 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hama.pipes.util;
+
+import java.io.IOException;
+import java.net.URI;
+import java.net.URISyntaxException;
+import java.net.URL;
+import java.net.URLClassLoader;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.filecache.DistributedCache;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.FileUtil;
+import org.apache.hadoop.fs.LocalFileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.util.StringUtils;
+
+public class DistributedCacheUtil {
+
+  private static final Log LOG = LogFactory.getLog(DistributedCacheUtil.class);
+
+  /**
+   * Transfers DistributedCache files into the local cache files. Also creates
+   * symbolic links for URIs specified with a fragment if
+   * DistributedCache.getSymlinks() is true.
+   * 
+   * @throws IOException If a DistributedCache file cannot be found.
+   */
+  public static final void moveLocalFiles(Configuration conf)
+      throws IOException {
+    StringBuilder files = new StringBuilder();
+    boolean first = true;
+    if (DistributedCache.getCacheFiles(conf) != null) {
+      for (URI uri : DistributedCache.getCacheFiles(conf)) {
+        if (uri != null) {
+          if (!first) {
+            files.append(",");
+          }
+          if (null != uri.getFragment() && DistributedCache.getSymlink(conf)) {
+
+            FileUtil.symLink(uri.getPath(), uri.getFragment());
+            files.append(uri.getFragment()).append(",");
+          }
+          FileSystem hdfs = FileSystem.get(conf);
+          Path pathSrc = new Path(uri.getPath());
+          // LOG.info("pathSrc: " + pathSrc);
+          if (hdfs.exists(pathSrc)) {
+            LocalFileSystem local = LocalFileSystem.getLocal(conf);
+            Path pathDst = new Path(local.getWorkingDirectory(),
+                pathSrc.getName());
+            // LOG.info("user.dir: "+System.getProperty("user.dir"));
+            // LOG.info("WorkingDirectory: "+local.getWorkingDirectory());
+            // LOG.info("pathDst: " + pathDst);
+            LOG.debug("copyToLocalFile: " + pathDst);
+            hdfs.copyToLocalFile(pathSrc, pathDst);
+            local.deleteOnExit(pathDst);
+            files.append(pathDst.toUri().getPath());
+          }
+          first = false;
+        }
+      }
+    }
+    if (files.length() > 0) {
+      DistributedCache.addLocalFiles(conf, files.toString());
+    }
+  }
+
+  /**
+   * Add the Files to HDFS
+   * 
+   * @param conf
+   * @param paths
+   */
+  public static String addFilesToHDFS(Configuration conf, String files) {
+    if (files == null)
+      return null;
+    String[] fileArr = files.split(",");
+    String[] finalArr = new String[fileArr.length];
+
+    for (int i = 0; i < fileArr.length; i++) {
+      String tmp = fileArr[i];
+      String finalPath;
+
+      URI pathURI;
+      try {
+        pathURI = new URI(tmp);
+      } catch (URISyntaxException e) {
+        throw new IllegalArgumentException(e);
+      }
+
+      try {
+        LocalFileSystem local = LocalFileSystem.getLocal(conf);
+        Path pathSrc = new Path(pathURI);
+        // LOG.info("pathSrc: " + pathSrc);
+
+        if (local.exists(pathSrc)) {
+          FileSystem hdfs = FileSystem.get(conf);
+          Path pathDst = new Path(hdfs.getWorkingDirectory() + "/temp",
+              pathSrc.getName());
+
+          // LOG.info("WorkingDirectory: " + hdfs.getWorkingDirectory());
+          LOG.debug("copyToHDFSFile: " + pathDst);
+          hdfs.copyFromLocalFile(pathSrc, pathDst);
+          hdfs.deleteOnExit(pathDst);
+
+          finalPath = pathDst.makeQualified(hdfs).toString();
+          finalArr[i] = finalPath;
+        }
+      } catch (IOException e) {
+        LOG.error(e);
+      }
+
+    }
+    return StringUtils.arrayToString(finalArr);
+  }
+
+  /**
+   * Add the JARs from the given HDFS paths to the Classpath
+   * 
+   * @param conf
+   * @param urls
+   */
+  public static URL[] addJarsToJobClasspath(Configuration conf) {
+    URL[] classLoaderURLs = ((URLClassLoader) conf.getClassLoader()).getURLs();
+    String files = conf.get("tmpjars", "");
+
+    if (!files.isEmpty()) {
+      String[] fileArr = files.split(",");
+      URL[] libjars = new URL[fileArr.length + classLoaderURLs.length];
+
+      for (int i = 0; i < fileArr.length; i++) {
+        String tmp = fileArr[i];
+
+        URI pathURI;
+        try {
+          pathURI = new URI(tmp);
+        } catch (URISyntaxException e) {
+          throw new IllegalArgumentException(e);
+        }
+
+        try {
+          FileSystem hdfs = FileSystem.get(conf);
+          Path pathSrc = new Path(pathURI.getPath());
+          // LOG.info("pathSrc: " + pathSrc);
+
+          if (hdfs.exists(pathSrc)) {
+            LocalFileSystem local = LocalFileSystem.getLocal(conf);
+
+            // File dst = File.createTempFile(pathSrc.getName() + "-", ".jar");
+            Path pathDst = new Path(local.getWorkingDirectory(),
+                pathSrc.getName());
+
+            LOG.debug("copyToLocalFile: " + pathDst);
+            hdfs.copyToLocalFile(pathSrc, pathDst);
+            local.deleteOnExit(pathDst);
+
+            libjars[i] = new URL(local.makeQualified(pathDst).toString());
+          }
+
+        } catch (IOException ex) {
+          throw new RuntimeException("Error setting up classpath", ex);
+        }
+      }
+
+      // Add old classLoader entries
+      int index = fileArr.length;
+      for (int i = 0; i < classLoaderURLs.length; i++) {
+        libjars[index] = classLoaderURLs[i];
+        index++;
+      }
+
+      // Set classloader in current conf/thread
+      conf.setClassLoader(new URLClassLoader(libjars, conf.getClassLoader()));
+
+      Thread.currentThread().setContextClassLoader(
+          new URLClassLoader(libjars, Thread.currentThread()
+              .getContextClassLoader()));
+
+      // URL[] urls = ((URLClassLoader) conf.getClassLoader()).getURLs();
+      // for (URL u : urls)
+      // LOG.info("newClassLoader: " + u.getPath());
+
+      // Set tmpjars
+      // hdfs to local path
+      String jars = "";
+      for (int i = 0; i < fileArr.length; i++) {
+        URL url = libjars[i];
+        if (jars.length() > 0) {
+          jars += ",";
+        }
+        jars += url.toString();
+      }
+      conf.set("tmpjars", jars);
+
+      return libjars;
+    }
+    return null;
+  }
+
+}

Added: hama/trunk/core/src/main/java/org/apache/hama/pipes/util/SequenceFileDumper.java
URL: http://svn.apache.org/viewvc/hama/trunk/core/src/main/java/org/apache/hama/pipes/util/SequenceFileDumper.java?rev=1465852&view=auto
==============================================================================
--- hama/trunk/core/src/main/java/org/apache/hama/pipes/util/SequenceFileDumper.java (added)
+++ hama/trunk/core/src/main/java/org/apache/hama/pipes/util/SequenceFileDumper.java Tue Apr  9 01:28:04 2013
@@ -0,0 +1,190 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hama.pipes.util;
+
+import java.io.FileWriter;
+import java.io.OutputStreamWriter;
+import java.io.Writer;
+
+import org.apache.commons.cli.BasicParser;
+import org.apache.commons.cli.CommandLine;
+import org.apache.commons.cli.Option;
+import org.apache.commons.cli.OptionBuilder;
+import org.apache.commons.cli.Options;
+import org.apache.commons.cli.ParseException;
+import org.apache.commons.cli.Parser;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.SequenceFile;
+import org.apache.hadoop.io.Writable;
+import org.apache.hama.HamaConfiguration;
+//import org.apache.hama.util.GenericOptionsParser;
+
+public class SequenceFileDumper {
+
+  protected static final Log LOG = LogFactory.getLog(SequenceFileDumper.class);
+  public static String LINE_SEP = System.getProperty("line.separator");
+
+  private SequenceFileDumper() {
+  }
+
+  /**
+   * A command line parser for the CLI-based SequenceFileDumper.
+   */
+  static class CommandLineParser {
+    private Options options = new Options();
+
+    void addOption(String longName, boolean required, String description,
+        String paramName) {
+      Option option = OptionBuilder.withArgName(paramName).hasArgs(1)
+          .withDescription(description).isRequired(required).create(longName);
+      options.addOption(option);
+    }
+
+    void addArgument(String name, boolean required, String description) {
+      Option option = OptionBuilder.withArgName(name).hasArgs(1)
+          .withDescription(description).isRequired(required).create();
+      options.addOption(option);
+
+    }
+
+    Parser createParser() {
+      Parser result = new BasicParser();
+      return result;
+    }
+
+    void printUsage() {
+      // The CLI package should do this for us, but I can't figure out how
+      // to make it print something reasonable.
+      System.out.println("hama seqdumper");
+      System.out
+          .println("  [-seqFile <path>] // The Sequence File containing the Clusters");
+      System.out
+          .println("  [-output <path>] // The output file.  If not specified, dumps to the console");
+      System.out
+          .println("  [-substring <number> // The number of chars of the FormatString() to print");
+      System.out.println("  [-count <true>] // Report the count only");
+      System.out.println("  [-help] // Print out help");
+      System.out.println();
+      //GenericOptionsParser.printGenericCommandUsage(System.out);
+    }
+  }
+
+  public static void main(String[] args) throws Exception {
+    CommandLineParser cli = new CommandLineParser();
+    if (args.length == 0) {
+      cli.printUsage();
+      return;
+    }
+
+    LOG.info("DEBUG: Hama SequenceFileDumper started!");
+
+    cli.addOption("seqFile", false,
+        "The Sequence File containing the Clusters", "path");
+    cli.addOption("output", false,
+        "The output file.  If not specified, dumps to the console", "path");
+
+    cli.addOption("substring", false,
+        "The number of chars of the FormatString() to print", "number");
+    cli.addOption("count", false, "Report the count only", "number");
+    cli.addOption("help", false, "Print out help", "class");
+
+    Parser parser = cli.createParser();
+
+    try {
+      HamaConfiguration conf = new HamaConfiguration();
+
+      //GenericOptionsParser genericParser = new GenericOptionsParser(conf, args);
+
+      CommandLine cmdLine = parser.parse(cli.options, args);
+      //    genericParser.getRemainingArgs());
+      LOG.debug("DEBUG: Arguments: " + args); //genericParser.getRemainingArgs());
+
+      if (cmdLine.hasOption("help")) {
+        cli.printUsage();
+        return;
+      }
+
+      if (cmdLine.hasOption("seqFile")) {
+        Path path = new Path(cmdLine.getOptionValue("seqFile"));
+
+        FileSystem fs = FileSystem.get(path.toUri(), conf);
+        SequenceFile.Reader reader = new SequenceFile.Reader(fs, path, conf);
+
+        Writer writer;
+        if (cmdLine.hasOption("output")) {
+          writer = new FileWriter(cmdLine.getOptionValue("output"));
+        } else {
+          writer = new OutputStreamWriter(System.out);
+        }
+        writer.append("Input Path: ").append(String.valueOf(path))
+            .append(LINE_SEP);
+
+        int sub = Integer.MAX_VALUE;
+        if (cmdLine.hasOption("substring")) {
+          sub = Integer.parseInt(cmdLine.getOptionValue("substring"));
+        }
+
+        boolean countOnly = cmdLine.hasOption("count");
+
+        Writable key = (Writable) reader.getKeyClass().newInstance();
+        Writable value = (Writable) reader.getValueClass().newInstance();
+        writer.append("Key class: ")
+            .append(String.valueOf(reader.getKeyClass()))
+            .append(" Value Class: ").append(String.valueOf(value.getClass()))
+            .append(LINE_SEP);
+        writer.flush();
+
+        long count = 0;
+        if (countOnly == false) {
+          while (reader.next(key, value)) {
+            writer.append("Key: ").append(String.valueOf(key));
+            String str = value.toString();
+            writer.append(": Value: ").append(
+                str.length() > sub ? str.substring(0, sub) : str);
+            writer.write(LINE_SEP);
+            writer.flush();
+            count++;
+          }
+          writer.append("Count: ").append(String.valueOf(count))
+              .append(LINE_SEP);
+        } else {
+          while (reader.next(key, value)) {
+            count++;
+          }
+          writer.append("Count: ").append(String.valueOf(count))
+              .append(LINE_SEP);
+        }
+        writer.flush();
+
+        if (cmdLine.hasOption("output")) {
+          writer.close();
+        }
+        reader.close();
+      }
+
+    } catch (ParseException e) {
+      LOG.info("Error : " + e);
+      cli.printUsage();
+      return;
+    }
+  }
+}

Modified: hama/trunk/pom.xml
URL: http://svn.apache.org/viewvc/hama/trunk/pom.xml?rev=1465852&r1=1465851&r2=1465852&view=diff
==============================================================================
--- hama/trunk/pom.xml (original)
+++ hama/trunk/pom.xml Tue Apr  9 01:28:04 2013
@@ -302,7 +302,7 @@
         <configuration>
           <excludes>
             <exclude>.idea/**</exclude>
-	    <exclude>.git/**</exclude>
+            <exclude>.git/**</exclude>
             <exclude>.classpath/**</exclude>
             <exclude>.project</exclude>
             <exclude>**/*.asc</exclude>
@@ -314,6 +314,7 @@
             <exclude>**/src/test/resources/*.properties</exclude>
             <exclude>**dependency-reduced-pom.xml</exclude>
             <exclude>**/src/test/resources/*.txt</exclude>
+            <exclude>c++/**</exclude>
           </excludes>
         </configuration>
       </plugin>