You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hama.apache.org by ed...@apache.org on 2013/04/09 03:28:06 UTC
svn commit: r1465852 [15/15] - in /hama/trunk: ./ bin/ c++/ c++/pipes/
c++/pipes/api/ c++/pipes/api/hama/ c++/pipes/debug/ c++/pipes/impl/
c++/utils/ c++/utils/api/ c++/utils/api/hadoop/ c++/utils/impl/
c++/utils/m4/ core/src/main/java/org/apache/hama/...
Added: hama/trunk/core/src/main/java/org/apache/hama/pipes/protocol/UplinkReader.java
URL: http://svn.apache.org/viewvc/hama/trunk/core/src/main/java/org/apache/hama/pipes/protocol/UplinkReader.java?rev=1465852&view=auto
==============================================================================
--- hama/trunk/core/src/main/java/org/apache/hama/pipes/protocol/UplinkReader.java (added)
+++ hama/trunk/core/src/main/java/org/apache/hama/pipes/protocol/UplinkReader.java Tue Apr 9 01:28:04 2013
@@ -0,0 +1,516 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hama.pipes.protocol;
+
+import java.io.BufferedInputStream;
+import java.io.DataInputStream;
+import java.io.DataOutputStream;
+import java.io.IOException;
+import java.io.InputStream;
+import java.util.AbstractMap;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Map.Entry;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.BytesWritable;
+import org.apache.hadoop.io.NullWritable;
+import org.apache.hadoop.io.SequenceFile;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.io.WritableUtils;
+import org.apache.hadoop.util.ReflectionUtils;
+import org.apache.hadoop.util.StringUtils;
+import org.apache.hama.Constants;
+import org.apache.hama.bsp.BSPPeer;
+import org.apache.hama.bsp.sync.SyncException;
+import org.apache.hama.util.KeyValuePair;
+
+public class UplinkReader<K1 extends Writable, V1 extends Writable, K2 extends Writable, V2 extends Writable>
+ extends Thread {
+
+ private static final Log LOG = LogFactory.getLog(UplinkReader.class);
+
+ protected DataInputStream inStream;
+ private K2 key;
+ private V2 value;
+
+ private BinaryProtocol<K1, V1, K2, V2> binProtocol;
+ private BSPPeer<K1, V1, K2, V2, BytesWritable> peer = null;
+ private Configuration conf;
+
+ private Map<Integer, Entry<SequenceFile.Reader, Entry<String, String>>> sequenceFileReaders;
+ private Map<Integer, Entry<SequenceFile.Writer, Entry<String, String>>> sequenceFileWriters;
+
+ @SuppressWarnings("unchecked")
+ public UplinkReader(BinaryProtocol<K1, V1, K2, V2> binaryProtocol,
+ Configuration conf, InputStream stream) throws IOException {
+
+ this.binProtocol = binaryProtocol;
+ this.conf = conf;
+
+ this.inStream = new DataInputStream(new BufferedInputStream(stream,
+ BinaryProtocol.BUFFER_SIZE));
+
+ this.key = (K2) ReflectionUtils.newInstance((Class<? extends K2>) conf
+ .getClass("bsp.output.key.class", Object.class), conf);
+
+ this.value = (V2) ReflectionUtils.newInstance((Class<? extends V2>) conf
+ .getClass("bsp.output.value.class", Object.class), conf);
+
+ this.sequenceFileReaders = new HashMap<Integer, Entry<SequenceFile.Reader, Entry<String, String>>>();
+ this.sequenceFileWriters = new HashMap<Integer, Entry<SequenceFile.Writer, Entry<String, String>>>();
+ }
+
+ public UplinkReader(BinaryProtocol<K1, V1, K2, V2> binaryProtocol,
+ BSPPeer<K1, V1, K2, V2, BytesWritable> peer, InputStream stream)
+ throws IOException {
+ this(binaryProtocol, peer.getConfiguration(), stream);
+ this.peer = peer;
+ }
+
+ private boolean isPeerAvailable() {
+ return this.peer != null;
+ }
+
+ public void closeConnection() throws IOException {
+ inStream.close();
+ }
+
+ @Override
+ public void run() {
+ while (true) {
+ try {
+ if (Thread.currentThread().isInterrupted()) {
+ throw new InterruptedException();
+ }
+
+ int cmd = readCommand();
+ if (cmd == -1)
+ continue;
+ LOG.debug("Handling uplink command " + cmd);
+
+ if (cmd == MessageType.WRITE_KEYVALUE.code && isPeerAvailable()) { // INCOMING
+ writeKeyValue();
+ } else if (cmd == MessageType.READ_KEYVALUE.code && isPeerAvailable()) { // OUTGOING
+ readKeyValue();
+ } else if (cmd == MessageType.INCREMENT_COUNTER.code
+ && isPeerAvailable()) { // INCOMING
+ incrementCounter();
+ } else if (cmd == MessageType.REGISTER_COUNTER.code
+ && isPeerAvailable()) { // INCOMING
+ /*
+ * Is not used in HAMA -> Hadoop Pipes - maybe for performance, skip
+ * transferring group and name each INCREMENT
+ */
+ } else if (cmd == MessageType.TASK_DONE.code) { // INCOMING
+ synchronized (binProtocol.hasTaskLock) {
+ binProtocol.setHasTask(false);
+ LOG.debug("Got MessageType.TASK_DONE");
+ binProtocol.hasTaskLock.notify();
+ }
+ } else if (cmd == MessageType.DONE.code) { // INCOMING
+ LOG.debug("Pipe child done");
+ return;
+ } else if (cmd == MessageType.SEND_MSG.code && isPeerAvailable()) { // INCOMING
+ sendMessage();
+ } else if (cmd == MessageType.GET_MSG_COUNT.code && isPeerAvailable()) { // OUTGOING
+ getMessageCount();
+ } else if (cmd == MessageType.GET_MSG.code && isPeerAvailable()) { // OUTGOING
+ getMessage();
+ } else if (cmd == MessageType.SYNC.code && isPeerAvailable()) { // INCOMING
+ sync();
+ } else if (cmd == MessageType.GET_ALL_PEERNAME.code
+ && isPeerAvailable()) { // OUTGOING
+ getAllPeerNames();
+ } else if (cmd == MessageType.GET_PEERNAME.code && isPeerAvailable()) { // OUTGOING
+ getPeerName();
+ } else if (cmd == MessageType.GET_PEER_INDEX.code && isPeerAvailable()) { // OUTGOING
+ getPeerIndex();
+ } else if (cmd == MessageType.GET_PEER_COUNT.code && isPeerAvailable()) { // OUTGOING
+ getPeerCount();
+ } else if (cmd == MessageType.GET_SUPERSTEP_COUNT.code
+ && isPeerAvailable()) { // OUTGOING
+ getSuperstepCount();
+ } else if (cmd == MessageType.REOPEN_INPUT.code && isPeerAvailable()) { // INCOMING
+ reopenInput();
+ } else if (cmd == MessageType.CLEAR.code && isPeerAvailable()) { // INCOMING
+ LOG.debug("Got MessageType.CLEAR");
+ peer.clear();
+ /* SequenceFileConnector Implementation */
+ } else if (cmd == MessageType.SEQFILE_OPEN.code) { // OUTGOING
+ seqFileOpen();
+ } else if (cmd == MessageType.SEQFILE_READNEXT.code) { // OUTGOING
+ seqFileReadNext();
+ } else if (cmd == MessageType.SEQFILE_APPEND.code) { // INCOMING
+ seqFileAppend();
+ } else if (cmd == MessageType.SEQFILE_CLOSE.code) { // OUTGOING
+ seqFileClose();
+ /* SequenceFileConnector Implementation */
+ } else if (cmd == MessageType.PARTITION_RESPONSE.code) { // INCOMING
+ partitionResponse();
+ } else {
+ throw new IOException("Bad command code: " + cmd);
+ }
+
+ } catch (InterruptedException e) {
+ return;
+ } catch (Throwable e) {
+ onError(e);
+ throw new RuntimeException(e);
+ }
+ }
+ }
+
+ protected void onError(Throwable e) {
+ LOG.error(StringUtils.stringifyException(e));
+ }
+
+ public int readCommand() throws IOException {
+ return WritableUtils.readVInt(inStream);
+ }
+
+ public void reopenInput() throws IOException {
+ LOG.debug("Got MessageType.REOPEN_INPUT");
+ peer.reopenInput();
+ }
+
+ public void getSuperstepCount() throws IOException {
+ DataOutputStream stream = binProtocol.getStream();
+ WritableUtils.writeVInt(stream, MessageType.GET_SUPERSTEP_COUNT.code);
+ WritableUtils.writeVLong(stream, peer.getSuperstepCount());
+ binProtocol.flush();
+
+ LOG.debug("Responded MessageType.GET_SUPERSTEP_COUNT - SuperstepCount: "
+ + peer.getSuperstepCount());
+ }
+
+ public void getPeerCount() throws IOException {
+ DataOutputStream stream = binProtocol.getStream();
+ WritableUtils.writeVInt(stream, MessageType.GET_PEER_COUNT.code);
+ WritableUtils.writeVInt(stream, peer.getNumPeers());
+ binProtocol.flush();
+ LOG.debug("Responded MessageType.GET_PEER_COUNT - NumPeers: "
+ + peer.getNumPeers());
+ }
+
+ public void getPeerIndex() throws IOException {
+ DataOutputStream stream = binProtocol.getStream();
+ WritableUtils.writeVInt(stream, MessageType.GET_PEER_INDEX.code);
+ WritableUtils.writeVInt(stream, peer.getPeerIndex());
+ binProtocol.flush();
+ LOG.debug("Responded MessageType.GET_PEER_INDEX - PeerIndex: "
+ + peer.getPeerIndex());
+ }
+
+ public void getPeerName() throws IOException {
+ DataOutputStream stream = binProtocol.getStream();
+ int id = WritableUtils.readVInt(inStream);
+ LOG.debug("Got MessageType.GET_PEERNAME id: " + id);
+
+ WritableUtils.writeVInt(stream, MessageType.GET_PEERNAME.code);
+ if (id == -1) { // -1 indicates get own PeerName
+ Text.writeString(stream, peer.getPeerName());
+ LOG.debug("Responded MessageType.GET_PEERNAME - Get Own PeerName: "
+ + peer.getPeerName());
+
+ } else if ((id < -1) || (id >= peer.getNumPeers())) {
+ // if no PeerName for this index is found write emptyString
+ Text.writeString(stream, "");
+ LOG.debug("Responded MessageType.GET_PEERNAME - Empty PeerName!");
+
+ } else {
+ Text.writeString(stream, peer.getPeerName(id));
+ LOG.debug("Responded MessageType.GET_PEERNAME - PeerName: "
+ + peer.getPeerName(id));
+ }
+ binProtocol.flush();
+ }
+
+ public void getAllPeerNames() throws IOException {
+ DataOutputStream stream = binProtocol.getStream();
+ LOG.debug("Got MessageType.GET_ALL_PEERNAME");
+ WritableUtils.writeVInt(stream, MessageType.GET_ALL_PEERNAME.code);
+ WritableUtils.writeVInt(stream, peer.getAllPeerNames().length);
+ for (String s : peer.getAllPeerNames())
+ Text.writeString(stream, s);
+
+ binProtocol.flush();
+ LOG.debug("Responded MessageType.GET_ALL_PEERNAME - peerNamesCount: "
+ + peer.getAllPeerNames().length);
+ }
+
+ public void sync() throws IOException, SyncException, InterruptedException {
+ LOG.debug("Got MessageType.SYNC");
+ peer.sync(); // this call blocks
+ }
+
+ public void getMessage() throws IOException {
+ DataOutputStream stream = binProtocol.getStream();
+ LOG.debug("Got MessageType.GET_MSG");
+ WritableUtils.writeVInt(stream, MessageType.GET_MSG.code);
+ BytesWritable msg = peer.getCurrentMessage();
+ if (msg != null)
+ binProtocol.writeObject(msg);
+
+ binProtocol.flush();
+ LOG.debug("Responded MessageType.GET_MSG - Message(BytesWritable) ");// +msg);
+ }
+
+ public void getMessageCount() throws IOException {
+ DataOutputStream stream = binProtocol.getStream();
+ WritableUtils.writeVInt(stream, MessageType.GET_MSG_COUNT.code);
+ WritableUtils.writeVInt(stream, peer.getNumCurrentMessages());
+ binProtocol.flush();
+ LOG.debug("Responded MessageType.GET_MSG_COUNT - Count: "
+ + peer.getNumCurrentMessages());
+ }
+
+ public void sendMessage() throws IOException {
+ String peerName = Text.readString(inStream);
+ BytesWritable msg = new BytesWritable();
+ readObject(msg);
+ LOG.debug("Got MessageType.SEND_MSG to peerName: " + peerName);
+ peer.send(peerName, msg);
+ }
+
+ public void incrementCounter() throws IOException {
+ // int id = WritableUtils.readVInt(inStream);
+ String group = Text.readString(inStream);
+ String name = Text.readString(inStream);
+ long amount = WritableUtils.readVLong(inStream);
+ peer.incrementCounter(name, group, amount);
+ }
+
+ public void readKeyValue() throws IOException {
+ DataOutputStream stream = binProtocol.getStream();
+ boolean nullinput = peer.getConfiguration().get(
+ Constants.INPUT_FORMAT_CLASS) == null
+ || peer.getConfiguration().get(Constants.INPUT_FORMAT_CLASS)
+ .equals("org.apache.hama.bsp.NullInputFormat");
+
+ if (!nullinput) {
+
+ KeyValuePair<K1, V1> pair = peer.readNext();
+
+ WritableUtils.writeVInt(stream, MessageType.READ_KEYVALUE.code);
+ if (pair != null) {
+ binProtocol.writeObject(pair.getKey());
+ binProtocol.writeObject(pair.getValue());
+
+ String valueStr = pair.getValue().toString();
+ LOG.debug("Responded MessageType.READ_KEYVALUE - Key: "
+ + pair.getKey()
+ + " Value: "
+ + ((valueStr.length() < 10) ? valueStr : valueStr.substring(0, 9)
+ + "..."));
+
+ } else {
+ Text.writeString(stream, "");
+ Text.writeString(stream, "");
+ LOG.debug("Responded MessageType.READ_KEYVALUE - EMPTY KeyValue Pair");
+ }
+ binProtocol.flush();
+
+ } else {
+ /* TODO */
+ /* Send empty Strings to show no KeyValue pair is available */
+ WritableUtils.writeVInt(stream, MessageType.READ_KEYVALUE.code);
+ Text.writeString(stream, "");
+ Text.writeString(stream, "");
+ binProtocol.flush();
+ LOG.debug("Responded MessageType.READ_KEYVALUE - EMPTY KeyValue Pair");
+ }
+ }
+
+ public void writeKeyValue() throws IOException {
+ readObject(key); // string or binary only
+ readObject(value); // string or binary only
+ if (LOG.isDebugEnabled())
+ LOG.debug("Got MessageType.WRITE_KEYVALUE - Key: " + key + " Value: "
+ + value);
+ peer.write(key, value);
+ }
+
+ public void seqFileOpen() throws IOException {
+ String path = Text.readString(inStream);
+ // option - read = "r" or write = "w"
+ String option = Text.readString(inStream);
+ // key and value Type stored in the SequenceFile
+ String keyType = Text.readString(inStream);
+ String valueType = Text.readString(inStream);
+
+ int fileID = -1;
+
+ FileSystem fs = FileSystem.get(conf);
+ if (option.equals("r")) {
+ SequenceFile.Reader reader;
+ try {
+ reader = new SequenceFile.Reader(fs, new Path(path), conf);
+ fileID = reader.hashCode();
+ sequenceFileReaders
+ .put(
+ fileID,
+ new AbstractMap.SimpleEntry<SequenceFile.Reader, Entry<String, String>>(
+ reader, new AbstractMap.SimpleEntry<String, String>(
+ keyType, valueType)));
+ } catch (IOException e) {
+ fileID = -1;
+ }
+
+ } else if (option.equals("w")) {
+ SequenceFile.Writer writer;
+ try {
+ writer = new SequenceFile.Writer(fs, conf, new Path(path), Text.class,
+ Text.class);
+ fileID = writer.hashCode();
+ sequenceFileWriters
+ .put(
+ fileID,
+ new AbstractMap.SimpleEntry<SequenceFile.Writer, Entry<String, String>>(
+ writer, new AbstractMap.SimpleEntry<String, String>(
+ keyType, valueType)));
+ } catch (IOException e) {
+ fileID = -1;
+ }
+ }
+
+ DataOutputStream stream = binProtocol.getStream();
+ WritableUtils.writeVInt(stream, MessageType.SEQFILE_OPEN.code);
+ WritableUtils.writeVInt(stream, fileID);
+ binProtocol.flush();
+ LOG.debug("Responded MessageType.SEQFILE_OPEN - FileID: " + fileID);
+ }
+
+ public void seqFileReadNext() throws IOException, ClassNotFoundException {
+ int fileID = WritableUtils.readVInt(inStream);
+ // LOG.debug("GOT MessageType.SEQFILE_READNEXT - FileID: " + fileID);
+
+ Class<?> keyType = conf.getClassLoader().loadClass(
+ sequenceFileReaders.get(fileID).getValue().getKey());
+ Writable key = (Writable) ReflectionUtils.newInstance(keyType, conf);
+
+ Class<?> valueType = conf.getClassLoader().loadClass(
+ sequenceFileReaders.get(fileID).getValue().getValue());
+ Writable value = (Writable) ReflectionUtils.newInstance(valueType, conf);
+
+ if (sequenceFileReaders.containsKey(fileID))
+ sequenceFileReaders.get(fileID).getKey().next(key, value);
+
+ // RESPOND
+ DataOutputStream stream = binProtocol.getStream();
+ WritableUtils.writeVInt(stream, MessageType.SEQFILE_READNEXT.code);
+ try {
+ String k = key.toString();
+ String v = value.toString();
+ Text.writeString(stream, k);
+ Text.writeString(stream, v);
+ LOG.debug("Responded MessageType.SEQFILE_READNEXT - key: " + k
+ + " value: " + ((v.length() < 10) ? v : v.substring(0, 9) + "..."));
+
+ } catch (NullPointerException e) { // key or value is null
+
+ Text.writeString(stream, "");
+ Text.writeString(stream, "");
+ LOG.debug("Responded MessageType.SEQFILE_READNEXT - EMPTY KeyValue Pair");
+ }
+ binProtocol.flush();
+ }
+
+ public void seqFileAppend() throws IOException {
+ int fileID = WritableUtils.readVInt(inStream);
+ String keyStr = Text.readString(inStream);
+ String valueStr = Text.readString(inStream);
+
+ boolean result = false;
+ if (sequenceFileWriters.containsKey(fileID)) {
+ sequenceFileWriters.get(fileID).getKey()
+ .append(new Text(keyStr), new Text(valueStr));
+ result = true;
+ }
+
+ // RESPOND
+ DataOutputStream stream = binProtocol.getStream();
+ WritableUtils.writeVInt(stream, MessageType.SEQFILE_APPEND.code);
+ WritableUtils.writeVInt(stream, result ? 1 : 0);
+ binProtocol.flush();
+ LOG.debug("Responded MessageType.SEQFILE_APPEND - Result: " + result);
+ }
+
+ public void seqFileClose() throws IOException {
+ int fileID = WritableUtils.readVInt(inStream);
+
+ boolean result = false;
+
+ if (sequenceFileReaders.containsKey(fileID)) {
+ sequenceFileReaders.get(fileID).getKey().close();
+ result = true;
+ } else if (sequenceFileWriters.containsKey(fileID)) {
+ sequenceFileWriters.get(fileID).getKey().close();
+ result = true;
+ }
+
+ // RESPOND
+ DataOutputStream stream = binProtocol.getStream();
+ WritableUtils.writeVInt(stream, MessageType.SEQFILE_CLOSE.code);
+ WritableUtils.writeVInt(stream, result ? 1 : 0);
+ binProtocol.flush();
+ LOG.debug("Responded MessageType.SEQFILE_CLOSE - Result: " + result);
+ }
+
+ public void partitionResponse() throws IOException {
+ int partResponse = WritableUtils.readVInt(inStream);
+ synchronized (binProtocol.resultLock) {
+ binProtocol.setResult(partResponse);
+ LOG.debug("Received MessageType.PARTITION_RESPONSE - Result: "
+ + partResponse);
+ binProtocol.resultLock.notify();
+ }
+ }
+
+ protected void readObject(Writable obj) throws IOException {
+ int numBytes = readCommand();
+ byte[] buffer;
+ // For BytesWritable and Text, use the specified length to set the length
+ // this causes the "obvious" translations to work. So that if you emit
+ // a string "abc" from C++, it shows up as "abc".
+ if (obj instanceof BytesWritable) {
+ buffer = new byte[numBytes];
+ inStream.readFully(buffer);
+ ((BytesWritable) obj).set(buffer, 0, numBytes);
+ } else if (obj instanceof Text) {
+ buffer = new byte[numBytes];
+ inStream.readFully(buffer);
+ ((Text) obj).set(buffer);
+ } else if (obj instanceof NullWritable) {
+ throw new IOException(
+ "Cannot read data into NullWritable! Check OutputClasses!");
+ } else {
+ /* TODO */
+ /* IntWritable, DoubleWritable */
+ throw new IOException(
+ "Hama Pipes does only support Text as Key/Value output!");
+ // obj.readFields(inStream);
+ }
+ }
+}
Added: hama/trunk/core/src/main/java/org/apache/hama/pipes/util/DistributedCacheUtil.java
URL: http://svn.apache.org/viewvc/hama/trunk/core/src/main/java/org/apache/hama/pipes/util/DistributedCacheUtil.java?rev=1465852&view=auto
==============================================================================
--- hama/trunk/core/src/main/java/org/apache/hama/pipes/util/DistributedCacheUtil.java (added)
+++ hama/trunk/core/src/main/java/org/apache/hama/pipes/util/DistributedCacheUtil.java Tue Apr 9 01:28:04 2013
@@ -0,0 +1,218 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hama.pipes.util;
+
+import java.io.IOException;
+import java.net.URI;
+import java.net.URISyntaxException;
+import java.net.URL;
+import java.net.URLClassLoader;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.filecache.DistributedCache;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.FileUtil;
+import org.apache.hadoop.fs.LocalFileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.util.StringUtils;
+
+public class DistributedCacheUtil {
+
+ private static final Log LOG = LogFactory.getLog(DistributedCacheUtil.class);
+
+ /**
+ * Transfers DistributedCache files into the local cache files. Also creates
+ * symbolic links for URIs specified with a fragment if
+ * DistributedCache.getSymlinks() is true.
+ *
+ * @throws IOException If a DistributedCache file cannot be found.
+ */
+ public static final void moveLocalFiles(Configuration conf)
+ throws IOException {
+ StringBuilder files = new StringBuilder();
+ boolean first = true;
+ if (DistributedCache.getCacheFiles(conf) != null) {
+ for (URI uri : DistributedCache.getCacheFiles(conf)) {
+ if (uri != null) {
+ if (!first) {
+ files.append(",");
+ }
+ if (null != uri.getFragment() && DistributedCache.getSymlink(conf)) {
+
+ FileUtil.symLink(uri.getPath(), uri.getFragment());
+ files.append(uri.getFragment()).append(",");
+ }
+ FileSystem hdfs = FileSystem.get(conf);
+ Path pathSrc = new Path(uri.getPath());
+ // LOG.info("pathSrc: " + pathSrc);
+ if (hdfs.exists(pathSrc)) {
+ LocalFileSystem local = LocalFileSystem.getLocal(conf);
+ Path pathDst = new Path(local.getWorkingDirectory(),
+ pathSrc.getName());
+ // LOG.info("user.dir: "+System.getProperty("user.dir"));
+ // LOG.info("WorkingDirectory: "+local.getWorkingDirectory());
+ // LOG.info("pathDst: " + pathDst);
+ LOG.debug("copyToLocalFile: " + pathDst);
+ hdfs.copyToLocalFile(pathSrc, pathDst);
+ local.deleteOnExit(pathDst);
+ files.append(pathDst.toUri().getPath());
+ }
+ first = false;
+ }
+ }
+ }
+ if (files.length() > 0) {
+ DistributedCache.addLocalFiles(conf, files.toString());
+ }
+ }
+
+ /**
+ * Add the Files to HDFS
+ *
+ * @param conf
+ * @param paths
+ */
+ public static String addFilesToHDFS(Configuration conf, String files) {
+ if (files == null)
+ return null;
+ String[] fileArr = files.split(",");
+ String[] finalArr = new String[fileArr.length];
+
+ for (int i = 0; i < fileArr.length; i++) {
+ String tmp = fileArr[i];
+ String finalPath;
+
+ URI pathURI;
+ try {
+ pathURI = new URI(tmp);
+ } catch (URISyntaxException e) {
+ throw new IllegalArgumentException(e);
+ }
+
+ try {
+ LocalFileSystem local = LocalFileSystem.getLocal(conf);
+ Path pathSrc = new Path(pathURI);
+ // LOG.info("pathSrc: " + pathSrc);
+
+ if (local.exists(pathSrc)) {
+ FileSystem hdfs = FileSystem.get(conf);
+ Path pathDst = new Path(hdfs.getWorkingDirectory() + "/temp",
+ pathSrc.getName());
+
+ // LOG.info("WorkingDirectory: " + hdfs.getWorkingDirectory());
+ LOG.debug("copyToHDFSFile: " + pathDst);
+ hdfs.copyFromLocalFile(pathSrc, pathDst);
+ hdfs.deleteOnExit(pathDst);
+
+ finalPath = pathDst.makeQualified(hdfs).toString();
+ finalArr[i] = finalPath;
+ }
+ } catch (IOException e) {
+ LOG.error(e);
+ }
+
+ }
+ return StringUtils.arrayToString(finalArr);
+ }
+
+ /**
+ * Add the JARs from the given HDFS paths to the Classpath
+ *
+ * @param conf
+ * @param urls
+ */
+ public static URL[] addJarsToJobClasspath(Configuration conf) {
+ URL[] classLoaderURLs = ((URLClassLoader) conf.getClassLoader()).getURLs();
+ String files = conf.get("tmpjars", "");
+
+ if (!files.isEmpty()) {
+ String[] fileArr = files.split(",");
+ URL[] libjars = new URL[fileArr.length + classLoaderURLs.length];
+
+ for (int i = 0; i < fileArr.length; i++) {
+ String tmp = fileArr[i];
+
+ URI pathURI;
+ try {
+ pathURI = new URI(tmp);
+ } catch (URISyntaxException e) {
+ throw new IllegalArgumentException(e);
+ }
+
+ try {
+ FileSystem hdfs = FileSystem.get(conf);
+ Path pathSrc = new Path(pathURI.getPath());
+ // LOG.info("pathSrc: " + pathSrc);
+
+ if (hdfs.exists(pathSrc)) {
+ LocalFileSystem local = LocalFileSystem.getLocal(conf);
+
+ // File dst = File.createTempFile(pathSrc.getName() + "-", ".jar");
+ Path pathDst = new Path(local.getWorkingDirectory(),
+ pathSrc.getName());
+
+ LOG.debug("copyToLocalFile: " + pathDst);
+ hdfs.copyToLocalFile(pathSrc, pathDst);
+ local.deleteOnExit(pathDst);
+
+ libjars[i] = new URL(local.makeQualified(pathDst).toString());
+ }
+
+ } catch (IOException ex) {
+ throw new RuntimeException("Error setting up classpath", ex);
+ }
+ }
+
+ // Add old classLoader entries
+ int index = fileArr.length;
+ for (int i = 0; i < classLoaderURLs.length; i++) {
+ libjars[index] = classLoaderURLs[i];
+ index++;
+ }
+
+ // Set classloader in current conf/thread
+ conf.setClassLoader(new URLClassLoader(libjars, conf.getClassLoader()));
+
+ Thread.currentThread().setContextClassLoader(
+ new URLClassLoader(libjars, Thread.currentThread()
+ .getContextClassLoader()));
+
+ // URL[] urls = ((URLClassLoader) conf.getClassLoader()).getURLs();
+ // for (URL u : urls)
+ // LOG.info("newClassLoader: " + u.getPath());
+
+ // Set tmpjars
+ // hdfs to local path
+ String jars = "";
+ for (int i = 0; i < fileArr.length; i++) {
+ URL url = libjars[i];
+ if (jars.length() > 0) {
+ jars += ",";
+ }
+ jars += url.toString();
+ }
+ conf.set("tmpjars", jars);
+
+ return libjars;
+ }
+ return null;
+ }
+
+}
Added: hama/trunk/core/src/main/java/org/apache/hama/pipes/util/SequenceFileDumper.java
URL: http://svn.apache.org/viewvc/hama/trunk/core/src/main/java/org/apache/hama/pipes/util/SequenceFileDumper.java?rev=1465852&view=auto
==============================================================================
--- hama/trunk/core/src/main/java/org/apache/hama/pipes/util/SequenceFileDumper.java (added)
+++ hama/trunk/core/src/main/java/org/apache/hama/pipes/util/SequenceFileDumper.java Tue Apr 9 01:28:04 2013
@@ -0,0 +1,190 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hama.pipes.util;
+
+import java.io.FileWriter;
+import java.io.OutputStreamWriter;
+import java.io.Writer;
+
+import org.apache.commons.cli.BasicParser;
+import org.apache.commons.cli.CommandLine;
+import org.apache.commons.cli.Option;
+import org.apache.commons.cli.OptionBuilder;
+import org.apache.commons.cli.Options;
+import org.apache.commons.cli.ParseException;
+import org.apache.commons.cli.Parser;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.SequenceFile;
+import org.apache.hadoop.io.Writable;
+import org.apache.hama.HamaConfiguration;
+//import org.apache.hama.util.GenericOptionsParser;
+
+public class SequenceFileDumper {
+
+ protected static final Log LOG = LogFactory.getLog(SequenceFileDumper.class);
+ public static String LINE_SEP = System.getProperty("line.separator");
+
+ private SequenceFileDumper() {
+ }
+
+ /**
+ * A command line parser for the CLI-based SequenceFileDumper.
+ */
+ static class CommandLineParser {
+ private Options options = new Options();
+
+ void addOption(String longName, boolean required, String description,
+ String paramName) {
+ Option option = OptionBuilder.withArgName(paramName).hasArgs(1)
+ .withDescription(description).isRequired(required).create(longName);
+ options.addOption(option);
+ }
+
+ void addArgument(String name, boolean required, String description) {
+ Option option = OptionBuilder.withArgName(name).hasArgs(1)
+ .withDescription(description).isRequired(required).create();
+ options.addOption(option);
+
+ }
+
+ Parser createParser() {
+ Parser result = new BasicParser();
+ return result;
+ }
+
+ void printUsage() {
+ // The CLI package should do this for us, but I can't figure out how
+ // to make it print something reasonable.
+ System.out.println("hama seqdumper");
+ System.out
+ .println(" [-seqFile <path>] // The Sequence File containing the Clusters");
+ System.out
+ .println(" [-output <path>] // The output file. If not specified, dumps to the console");
+ System.out
+ .println(" [-substring <number> // The number of chars of the FormatString() to print");
+ System.out.println(" [-count <true>] // Report the count only");
+ System.out.println(" [-help] // Print out help");
+ System.out.println();
+ //GenericOptionsParser.printGenericCommandUsage(System.out);
+ }
+ }
+
+ public static void main(String[] args) throws Exception {
+ CommandLineParser cli = new CommandLineParser();
+ if (args.length == 0) {
+ cli.printUsage();
+ return;
+ }
+
+ LOG.info("DEBUG: Hama SequenceFileDumper started!");
+
+ cli.addOption("seqFile", false,
+ "The Sequence File containing the Clusters", "path");
+ cli.addOption("output", false,
+ "The output file. If not specified, dumps to the console", "path");
+
+ cli.addOption("substring", false,
+ "The number of chars of the FormatString() to print", "number");
+ cli.addOption("count", false, "Report the count only", "number");
+ cli.addOption("help", false, "Print out help", "class");
+
+ Parser parser = cli.createParser();
+
+ try {
+ HamaConfiguration conf = new HamaConfiguration();
+
+ //GenericOptionsParser genericParser = new GenericOptionsParser(conf, args);
+
+ CommandLine cmdLine = parser.parse(cli.options, args);
+ // genericParser.getRemainingArgs());
+ LOG.debug("DEBUG: Arguments: " + args); //genericParser.getRemainingArgs());
+
+ if (cmdLine.hasOption("help")) {
+ cli.printUsage();
+ return;
+ }
+
+ if (cmdLine.hasOption("seqFile")) {
+ Path path = new Path(cmdLine.getOptionValue("seqFile"));
+
+ FileSystem fs = FileSystem.get(path.toUri(), conf);
+ SequenceFile.Reader reader = new SequenceFile.Reader(fs, path, conf);
+
+ Writer writer;
+ if (cmdLine.hasOption("output")) {
+ writer = new FileWriter(cmdLine.getOptionValue("output"));
+ } else {
+ writer = new OutputStreamWriter(System.out);
+ }
+ writer.append("Input Path: ").append(String.valueOf(path))
+ .append(LINE_SEP);
+
+ int sub = Integer.MAX_VALUE;
+ if (cmdLine.hasOption("substring")) {
+ sub = Integer.parseInt(cmdLine.getOptionValue("substring"));
+ }
+
+ boolean countOnly = cmdLine.hasOption("count");
+
+ Writable key = (Writable) reader.getKeyClass().newInstance();
+ Writable value = (Writable) reader.getValueClass().newInstance();
+ writer.append("Key class: ")
+ .append(String.valueOf(reader.getKeyClass()))
+ .append(" Value Class: ").append(String.valueOf(value.getClass()))
+ .append(LINE_SEP);
+ writer.flush();
+
+ long count = 0;
+ if (countOnly == false) {
+ while (reader.next(key, value)) {
+ writer.append("Key: ").append(String.valueOf(key));
+ String str = value.toString();
+ writer.append(": Value: ").append(
+ str.length() > sub ? str.substring(0, sub) : str);
+ writer.write(LINE_SEP);
+ writer.flush();
+ count++;
+ }
+ writer.append("Count: ").append(String.valueOf(count))
+ .append(LINE_SEP);
+ } else {
+ while (reader.next(key, value)) {
+ count++;
+ }
+ writer.append("Count: ").append(String.valueOf(count))
+ .append(LINE_SEP);
+ }
+ writer.flush();
+
+ if (cmdLine.hasOption("output")) {
+ writer.close();
+ }
+ reader.close();
+ }
+
+ } catch (ParseException e) {
+ LOG.info("Error : " + e);
+ cli.printUsage();
+ return;
+ }
+ }
+}
Modified: hama/trunk/pom.xml
URL: http://svn.apache.org/viewvc/hama/trunk/pom.xml?rev=1465852&r1=1465851&r2=1465852&view=diff
==============================================================================
--- hama/trunk/pom.xml (original)
+++ hama/trunk/pom.xml Tue Apr 9 01:28:04 2013
@@ -302,7 +302,7 @@
<configuration>
<excludes>
<exclude>.idea/**</exclude>
- <exclude>.git/**</exclude>
+ <exclude>.git/**</exclude>
<exclude>.classpath/**</exclude>
<exclude>.project</exclude>
<exclude>**/*.asc</exclude>
@@ -314,6 +314,7 @@
<exclude>**/src/test/resources/*.properties</exclude>
<exclude>**dependency-reduced-pom.xml</exclude>
<exclude>**/src/test/resources/*.txt</exclude>
+ <exclude>c++/**</exclude>
</excludes>
</configuration>
</plugin>