You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hama.apache.org by tj...@apache.org on 2012/08/18 10:36:38 UTC
svn commit: r1374521 - in /hama/trunk: CHANGES.txt
core/src/main/java/org/apache/hama/bsp/BSPPeerImpl.java
Author: tjungblut
Date: Sat Aug 18 08:36:38 2012
New Revision: 1374521
URL: http://svn.apache.org/viewvc?rev=1374521&view=rev
Log:
[HAMA-628]: DistributedCache - getLocalCacheFiles returns NULL in Pseudo Distributed Mode (Martin Illecker via tjungblut)
Modified:
hama/trunk/CHANGES.txt
hama/trunk/core/src/main/java/org/apache/hama/bsp/BSPPeerImpl.java
Modified: hama/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/hama/trunk/CHANGES.txt?rev=1374521&r1=1374520&r2=1374521&view=diff
==============================================================================
--- hama/trunk/CHANGES.txt (original)
+++ hama/trunk/CHANGES.txt Sat Aug 18 08:36:38 2012
@@ -29,7 +29,8 @@ Release 0.5 - April 10, 2012
HAMA-409: Add basic Graph interfaces and GraphJobRunner (edwardyoon)
BUG FIXES
-
+
+ HAMA-628: DistributedCache - getLocalCacheFiles returns NULL in Pseudo Distributed Mode (Martin Illecker via tjungblut)
HAMA-621: Input Splits are not initialized for a job (surajmenon via tjungblut)
HAMA-595: Fix NullPointerException in Task Scheduler (surajmenon)
Modified: hama/trunk/core/src/main/java/org/apache/hama/bsp/BSPPeerImpl.java
URL: http://svn.apache.org/viewvc/hama/trunk/core/src/main/java/org/apache/hama/bsp/BSPPeerImpl.java?rev=1374521&r1=1374520&r2=1374521&view=diff
==============================================================================
--- hama/trunk/core/src/main/java/org/apache/hama/bsp/BSPPeerImpl.java (original)
+++ hama/trunk/core/src/main/java/org/apache/hama/bsp/BSPPeerImpl.java Sat Aug 18 08:36:38 2012
@@ -19,13 +19,17 @@ package org.apache.hama.bsp;
import java.io.IOException;
import java.net.InetSocketAddress;
+import java.net.URI;
import java.util.Iterator;
import java.util.Map.Entry;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.filecache.DistributedCache;
import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.FileUtil;
+import org.apache.hadoop.fs.LocalFileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.BytesWritable;
import org.apache.hadoop.io.DataInputBuffer;
@@ -54,7 +58,10 @@ public final class BSPPeerImpl<K1, V1, K
private static final Log LOG = LogFactory.getLog(BSPPeerImpl.class);
public static enum PeerCounter {
- SUPERSTEP_SUM, SUPERSTEPS, TASK_INPUT_RECORDS, TASK_OUTPUT_RECORDS, IO_BYTES_READ, MESSAGE_BYTES_TRANSFERED, MESSAGE_BYTES_RECEIVED, TOTAL_MESSAGES_SENT, TOTAL_MESSAGES_RECEIVED, COMPRESSED_BYTES_SENT, COMPRESSED_BYTES_RECEIVED, TIME_IN_SYNC_MS
+ SUPERSTEP_SUM, SUPERSTEPS, TASK_INPUT_RECORDS, TASK_OUTPUT_RECORDS,
+ IO_BYTES_READ, MESSAGE_BYTES_TRANSFERED, MESSAGE_BYTES_RECEIVED,
+ TOTAL_MESSAGES_SENT, TOTAL_MESSAGES_RECEIVED, COMPRESSED_BYTES_SENT,
+ COMPRESSED_BYTES_RECEIVED, TIME_IN_SYNC_MS
}
private final Configuration conf;
@@ -232,6 +239,45 @@ public final class BSPPeerImpl<K1, V1, K
}
}
+ /**
+ * Transfers DistributedCache files into the local cache files. Also creates
+ * symbolic links for URIs specified with a fragment if
+ * DistributedCache.getSymlinks() is true.
+ *
+ * @throws IOException If a DistributedCache file cannot be found.
+ */
+ public final void moveLocalFiles() throws IOException {
+ StringBuilder files = new StringBuilder();
+ boolean first = true;
+ if (DistributedCache.getCacheFiles(conf) != null) {
+ for (URI uri : DistributedCache.getCacheFiles(conf)) {
+ if (uri != null) {
+ if (!first) {
+ files.append(",");
+ }
+ if (null != uri.getFragment() && DistributedCache.getSymlink(conf)) {
+
+ FileUtil.symLink(uri.getPath(), uri.getFragment());
+ files.append(uri.getFragment()).append(",");
+ }
+ FileSystem hdfs = FileSystem.get(conf);
+ Path pathSrc = new Path(uri.getPath());
+ if (hdfs.exists(pathSrc)) {
+ LocalFileSystem local = FileSystem.getLocal(conf);
+ Path pathDst = new Path(local.getWorkingDirectory(),
+ pathSrc.getName());
+ hdfs.copyToLocalFile(pathSrc, pathDst);
+ files.append(pathDst.toUri().getPath());
+ }
+ first = false;
+ }
+ }
+ }
+ if (files.length() > 0) {
+ DistributedCache.addLocalFiles(conf, files.toString());
+ }
+ }
+
@SuppressWarnings("unchecked")
public final void initialize() throws Exception {
@@ -253,6 +299,13 @@ public final class BSPPeerImpl<K1, V1, K
}
};
+ // Move files from DistributedCache to the local cache
+ // and set DistributedCache.LocalFiles
+ try {
+ moveLocalFiles();
+ } catch (Exception e) {
+ LOG.error(e);
+ }
}
@SuppressWarnings("unchecked")
@@ -451,6 +504,25 @@ public final class BSPPeerImpl<K1, V1, K
currentTaskStatus.getSuperstepCount());
}
+ /**
+ * Delete files from the local cache
+ *
+ * @throws IOException If a DistributedCache file cannot be found.
+ */
+ public void deleteLocalFiles() throws IOException {
+ if (DistributedCache.getLocalCacheFiles(conf) != null) {
+ for (Path path : DistributedCache.getLocalCacheFiles(conf)) {
+ if (path != null) {
+ LocalFileSystem local = FileSystem.getLocal(conf);
+ if (local.exists(path)) {
+ local.delete(path, true); // recursive true
+ }
+ }
+ }
+ }
+ DistributedCache.setLocalFiles(conf, "");
+ }
+
public final void close() {
// there are many catches, because we want to close always every component
// even if the one before failed.
@@ -479,6 +551,12 @@ public final class BSPPeerImpl<K1, V1, K
} catch (Exception e) {
LOG.error(e);
}
+ // Delete files from the local cache
+ try {
+ deleteLocalFiles();
+ } catch (Exception e) {
+ LOG.error(e);
+ }
}
@Override