You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hama.apache.org by tj...@apache.org on 2012/08/18 10:36:38 UTC

svn commit: r1374521 - in /hama/trunk: CHANGES.txt core/src/main/java/org/apache/hama/bsp/BSPPeerImpl.java

Author: tjungblut
Date: Sat Aug 18 08:36:38 2012
New Revision: 1374521

URL: http://svn.apache.org/viewvc?rev=1374521&view=rev
Log:
[HAMA-628]: DistributedCache - getLocalCacheFiles returns NULL in Pseudo Distributed Mode (Martin Illecker via tjungblut)

Modified:
    hama/trunk/CHANGES.txt
    hama/trunk/core/src/main/java/org/apache/hama/bsp/BSPPeerImpl.java

Modified: hama/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/hama/trunk/CHANGES.txt?rev=1374521&r1=1374520&r2=1374521&view=diff
==============================================================================
--- hama/trunk/CHANGES.txt (original)
+++ hama/trunk/CHANGES.txt Sat Aug 18 08:36:38 2012
@@ -29,7 +29,8 @@ Release 0.5 - April 10, 2012 
    HAMA-409: Add basic Graph interfaces and GraphJobRunner (edwardyoon)
 
   BUG FIXES
-
+   
+   HAMA-628: DistributedCache - getLocalCacheFiles returns NULL in Pseudo Distributed Mode (Martin Illecker via tjungblut)
    HAMA-621: Input Splits are not initialized for a job (surajmenon via tjungblut)
    HAMA-595: Fix NullPointerException in Task Scheduler (surajmenon)
 

Modified: hama/trunk/core/src/main/java/org/apache/hama/bsp/BSPPeerImpl.java
URL: http://svn.apache.org/viewvc/hama/trunk/core/src/main/java/org/apache/hama/bsp/BSPPeerImpl.java?rev=1374521&r1=1374520&r2=1374521&view=diff
==============================================================================
--- hama/trunk/core/src/main/java/org/apache/hama/bsp/BSPPeerImpl.java (original)
+++ hama/trunk/core/src/main/java/org/apache/hama/bsp/BSPPeerImpl.java Sat Aug 18 08:36:38 2012
@@ -19,13 +19,17 @@ package org.apache.hama.bsp;
 
 import java.io.IOException;
 import java.net.InetSocketAddress;
+import java.net.URI;
 import java.util.Iterator;
 import java.util.Map.Entry;
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.filecache.DistributedCache;
 import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.FileUtil;
+import org.apache.hadoop.fs.LocalFileSystem;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.io.BytesWritable;
 import org.apache.hadoop.io.DataInputBuffer;
@@ -54,7 +58,10 @@ public final class BSPPeerImpl<K1, V1, K
   private static final Log LOG = LogFactory.getLog(BSPPeerImpl.class);
 
   public static enum PeerCounter {
-    SUPERSTEP_SUM, SUPERSTEPS, TASK_INPUT_RECORDS, TASK_OUTPUT_RECORDS, IO_BYTES_READ, MESSAGE_BYTES_TRANSFERED, MESSAGE_BYTES_RECEIVED, TOTAL_MESSAGES_SENT, TOTAL_MESSAGES_RECEIVED, COMPRESSED_BYTES_SENT, COMPRESSED_BYTES_RECEIVED, TIME_IN_SYNC_MS
+    SUPERSTEP_SUM, SUPERSTEPS, TASK_INPUT_RECORDS, TASK_OUTPUT_RECORDS,
+    IO_BYTES_READ, MESSAGE_BYTES_TRANSFERED, MESSAGE_BYTES_RECEIVED,
+    TOTAL_MESSAGES_SENT, TOTAL_MESSAGES_RECEIVED, COMPRESSED_BYTES_SENT,
+    COMPRESSED_BYTES_RECEIVED, TIME_IN_SYNC_MS
   }
 
   private final Configuration conf;
@@ -232,6 +239,45 @@ public final class BSPPeerImpl<K1, V1, K
     }
   }
 
+  /**
+   * Transfers DistributedCache files into the local cache files. Also creates
+   * symbolic links for URIs specified with a fragment if
+   * DistributedCache.getSymlinks() is true.
+   * 
+   * @throws IOException If a DistributedCache file cannot be found.
+   */
+  public final void moveLocalFiles() throws IOException {
+    StringBuilder files = new StringBuilder();
+    boolean first = true;
+    if (DistributedCache.getCacheFiles(conf) != null) {
+      for (URI uri : DistributedCache.getCacheFiles(conf)) {
+        if (uri != null) {
+          if (!first) {
+            files.append(",");
+          }
+          if (null != uri.getFragment() && DistributedCache.getSymlink(conf)) {
+
+            FileUtil.symLink(uri.getPath(), uri.getFragment());
+            files.append(uri.getFragment()).append(",");
+          }
+          FileSystem hdfs = FileSystem.get(conf);
+          Path pathSrc = new Path(uri.getPath());
+          if (hdfs.exists(pathSrc)) {
+            LocalFileSystem local = FileSystem.getLocal(conf);
+            Path pathDst = new Path(local.getWorkingDirectory(),
+                pathSrc.getName());
+            hdfs.copyToLocalFile(pathSrc, pathDst);
+            files.append(pathDst.toUri().getPath());
+          }
+          first = false;
+        }
+      }
+    }
+    if (files.length() > 0) {
+      DistributedCache.addLocalFiles(conf, files.toString());
+    }
+  }
+
   @SuppressWarnings("unchecked")
   public final void initialize() throws Exception {
 
@@ -253,6 +299,13 @@ public final class BSPPeerImpl<K1, V1, K
       }
     };
 
+    // Move files from DistributedCache to the local cache
+    // and set DistributedCache.LocalFiles
+    try {
+      moveLocalFiles();
+    } catch (Exception e) {
+      LOG.error(e);
+    }
   }
 
   @SuppressWarnings("unchecked")
@@ -451,6 +504,25 @@ public final class BSPPeerImpl<K1, V1, K
         currentTaskStatus.getSuperstepCount());
   }
 
+  /**
+   * Delete files from the local cache
+   * 
+   * @throws IOException If a DistributedCache file cannot be found.
+   */
+  public void deleteLocalFiles() throws IOException {
+    if (DistributedCache.getLocalCacheFiles(conf) != null) {
+      for (Path path : DistributedCache.getLocalCacheFiles(conf)) {
+        if (path != null) {
+          LocalFileSystem local = FileSystem.getLocal(conf);
+          if (local.exists(path)) {
+            local.delete(path, true); // recursive true
+          }
+        }
+      }
+    }
+    DistributedCache.setLocalFiles(conf, "");
+  }
+
   public final void close() {
     // there are many catches, because we want to close always every component
     // even if the one before failed.
@@ -479,6 +551,12 @@ public final class BSPPeerImpl<K1, V1, K
     } catch (Exception e) {
       LOG.error(e);
     }
+    // Delete files from the local cache
+    try {
+      deleteLocalFiles();
+    } catch (Exception e) {
+      LOG.error(e);
+    }
   }
 
   @Override