You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hama.apache.org by ed...@apache.org on 2014/02/04 06:44:26 UTC

svn commit: r1564208 - in /hama/trunk: conf/hama-default.xml core/src/main/java/org/apache/hama/bsp/PartitioningRunner.java

Author: edwardyoon
Date: Tue Feb  4 05:44:26 2014
New Revision: 1564208

URL: http://svn.apache.org/r1564208
Log:
HAMA-865: Use SequenceFile.Sorter for partitioningRunner, instead of memory-based sorted map.

Modified:
    hama/trunk/conf/hama-default.xml
    hama/trunk/core/src/main/java/org/apache/hama/bsp/PartitioningRunner.java

Modified: hama/trunk/conf/hama-default.xml
URL: http://svn.apache.org/viewvc/hama/trunk/conf/hama-default.xml?rev=1564208&r1=1564207&r2=1564208&view=diff
==============================================================================
--- hama/trunk/conf/hama-default.xml (original)
+++ hama/trunk/conf/hama-default.xml Tue Feb  4 05:44:26 2014
@@ -172,6 +172,19 @@
     </description>
   </property>
   <property>
+    <name>bsp.input.runtime.partitioning.sort.mb</name>
+    <value>50</value>
+    <description>The total amount of buffer memory in MB.
+    </description>
+  </property>
+  <property>
+    <name>bsp.input.runtime.partitioning.sort.factor</name>
+    <value>10</value>
+    <description>The maximum number of streams to merge at once; the default is 10.
+    </description>
+  </property>
+  
+  <property>
     <name>io.serializations</name>
     <value>org.apache.hadoop.io.serializer.WritableSerialization,org.apache.hadoop.io.serializer.JavaSerialization</value>
     <description>The default IO serialization protocol for HDFS I/O</description>

Modified: hama/trunk/core/src/main/java/org/apache/hama/bsp/PartitioningRunner.java
URL: http://svn.apache.org/viewvc/hama/trunk/core/src/main/java/org/apache/hama/bsp/PartitioningRunner.java?rev=1564208&r1=1564207&r2=1564208&view=diff
==============================================================================
--- hama/trunk/core/src/main/java/org/apache/hama/bsp/PartitioningRunner.java (original)
+++ hama/trunk/core/src/main/java/org/apache/hama/bsp/PartitioningRunner.java Tue Feb  4 05:44:26 2014
@@ -21,7 +21,6 @@ import java.io.IOException;
 import java.lang.reflect.Constructor;
 import java.util.HashMap;
 import java.util.Map;
-import java.util.Map.Entry;
 import java.util.SortedMap;
 import java.util.TreeMap;
 
@@ -31,7 +30,7 @@ import org.apache.hadoop.conf.Configurat
 import org.apache.hadoop.fs.FileStatus;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.io.IntWritable;
+import org.apache.hadoop.io.MapWritable;
 import org.apache.hadoop.io.NullWritable;
 import org.apache.hadoop.io.SequenceFile;
 import org.apache.hadoop.io.SequenceFile.CompressionType;
@@ -134,58 +133,51 @@ public class PartitioningRunner extends
 
   public Map<Integer, SequenceFile.Writer> writerCache = new HashMap<Integer, SequenceFile.Writer>();
 
-  @SuppressWarnings("rawtypes")
-  public SortedMap<WritableComparable, KeyValuePair<IntWritable, KeyValuePair>> sortedMap = new TreeMap<WritableComparable, KeyValuePair<IntWritable, KeyValuePair>>();
-
   @Override
-  @SuppressWarnings({ "rawtypes", "unchecked" })
+  @SuppressWarnings({ "rawtypes" })
   public void bsp(
       BSPPeer<Writable, Writable, Writable, Writable, NullWritable> peer)
       throws IOException, SyncException, InterruptedException {
 
     int peerNum = peer.getNumPeers();
     Partitioner partitioner = getPartitioner();
-    KeyValuePair<Writable, Writable> pair = null;
-    KeyValuePair<Writable, Writable> outputPair = null;
+    KeyValuePair<Writable, Writable> rawRecord = null;
+    KeyValuePair<Writable, Writable> convertedRecord = null;
 
-    Class keyClass = null;
-    Class valueClass = null;
-    while ((pair = peer.readNext()) != null) {
-      if (keyClass == null && valueClass == null) {
-        keyClass = pair.getKey().getClass();
-        valueClass = pair.getValue().getClass();
+    Class convertedKeyClass = null;
+    Class rawKeyClass = null;
+    Class rawValueClass = null;
+    MapWritable raw = null;
+
+    while ((rawRecord = peer.readNext()) != null) {
+      if (rawKeyClass == null && rawValueClass == null) {
+        rawKeyClass = rawRecord.getKey().getClass();
+        rawValueClass = rawRecord.getValue().getClass();
       }
-      outputPair = converter.convertRecord(pair, conf);
+      convertedRecord = converter.convertRecord(rawRecord, conf);
 
-      if (outputPair == null) {
+      if (convertedRecord == null) {
         continue;
       }
+      Writable convertedKey = convertedRecord.getKey();
+      convertedKeyClass = convertedKey.getClass();
 
-      int index = converter.getPartitionId(outputPair, partitioner, conf, peer,
-          desiredNum);
-
-      // if key is comparable and it need to be sorted by key,
-      if (outputPair.getKey() instanceof WritableComparable
-          && conf.getBoolean(Constants.PARTITION_SORT_BY_KEY, false)) {
-        sortedMap.put(
-            (WritableComparable) outputPair.getKey(),
-            new KeyValuePair(new IntWritable(index), new KeyValuePair(pair
-                .getKey(), pair.getValue())));
-      } else {
-        if (!writerCache.containsKey(index)) {
-          Path destFile = new Path(partitionDir + "/part-" + index + "/file-"
-              + peer.getPeerIndex());
-          SequenceFile.Writer writer = SequenceFile.createWriter(fs, conf,
-              destFile, keyClass, valueClass, CompressionType.NONE);
-          writerCache.put(index, writer);
-        }
+      int index = converter.getPartitionId(convertedRecord, partitioner, conf,
+          peer, desiredNum);
 
-        writerCache.get(index).append(pair.getKey(), pair.getValue());
+      if (!writerCache.containsKey(index)) {
+        Path destFile = new Path(partitionDir + "/part-" + index + "/file-"
+            + peer.getPeerIndex());
+        SequenceFile.Writer writer = SequenceFile.createWriter(fs, conf,
+            destFile, convertedKeyClass, MapWritable.class,
+            CompressionType.NONE);
+        writerCache.put(index, writer);
       }
-    }
 
-    if (sortedMap.size() > 0) {
-      writeSortedFile(peer.getPeerIndex(), keyClass, valueClass);
+      raw = new MapWritable();
+      raw.put(rawRecord.getKey(), rawRecord.getValue());
+
+      writerCache.get(index).append(convertedKey, raw);
     }
 
     for (SequenceFile.Writer w : writerCache.values()) {
@@ -206,11 +198,13 @@ public class PartitioningRunner extends
             + getPartitionName(partitionID));
 
         FileStatus[] files = fs.listStatus(stat.getPath());
-        if (outputPair.getKey() instanceof WritableComparable
+        if (convertedRecord.getKey() instanceof WritableComparable
             && conf.getBoolean(Constants.PARTITION_SORT_BY_KEY, false)) {
-          mergeSortedFiles(files, destinationFilePath, keyClass, valueClass);
+          mergeSortedFiles(files, destinationFilePath, convertedKeyClass,
+              rawKeyClass, rawValueClass);
         } else {
-          mergeFiles(files, destinationFilePath, keyClass, valueClass);
+          mergeFiles(files, destinationFilePath, convertedKeyClass,
+              rawKeyClass, rawValueClass);
         }
         fs.delete(stat.getPath(), true);
       }
@@ -218,99 +212,105 @@ public class PartitioningRunner extends
   }
 
   @SuppressWarnings("rawtypes")
-  private void writeSortedFile(int peerIndex, Class keyClass, Class valueClass)
-      throws IOException {
-    for (Entry<WritableComparable, KeyValuePair<IntWritable, KeyValuePair>> e : sortedMap
-        .entrySet()) {
-      int index = ((IntWritable) e.getValue().getKey()).get();
-      KeyValuePair rawRecord = e.getValue().getValue();
-
-      if (!writerCache.containsKey(index)) {
-        Path destFile = new Path(partitionDir + "/part-" + index + "/file-"
-            + peerIndex);
-        SequenceFile.Writer writer = SequenceFile.createWriter(fs, conf,
-            destFile, keyClass, valueClass, CompressionType.NONE);
-        writerCache.put(index, writer);
-      }
-
-      writerCache.get(index).append(rawRecord.getKey(), rawRecord.getValue());
-    }
-
-    sortedMap.clear();
-  }
+  public SortedMap<WritableComparable, KeyValuePair<Integer, KeyValuePair>> comparisonMap = new TreeMap<WritableComparable, KeyValuePair<Integer, KeyValuePair>>();
 
   @SuppressWarnings({ "rawtypes", "unchecked" })
   private void mergeSortedFiles(FileStatus[] status, Path destinationFilePath,
-      Class keyClass, Class valueClass) throws IOException {
+      Class convertedKeyClass, Class rawKeyClass, Class rawValueClass)
+      throws IOException {
     SequenceFile.Writer writer = SequenceFile.createWriter(fs, conf,
-        destinationFilePath, keyClass, valueClass, CompressionType.NONE);
-    KeyValuePair outputPair = null;
-    Writable key;
-    Writable value;
+        destinationFilePath, rawKeyClass, rawValueClass, CompressionType.NONE);
+    WritableComparable convertedKey;
+    MapWritable value;
 
     Map<Integer, SequenceFile.Reader> readers = new HashMap<Integer, SequenceFile.Reader>();
     for (int i = 0; i < status.length; i++) {
-      readers.put(i, new SequenceFile.Reader(fs, status[i].getPath(), conf));
+      SequenceFile.Sorter sorter = new SequenceFile.Sorter(fs,
+          convertedKeyClass, MapWritable.class, conf);
+      sorter.setMemory(conf
+          .getInt("bsp.input.runtime.partitioning.sort.mb", 50) * 1024 * 1024);
+      sorter.setFactor(conf.getInt(
+          "bsp.input.runtime.partitioning.sort.factor", 10));
+      sorter.sort(status[i].getPath(), status[i].getPath().suffix(".sorted"));
+
+      readers.put(i,
+          new SequenceFile.Reader(fs, status[i].getPath().suffix(".sorted"),
+              conf));
     }
 
     for (int i = 0; i < readers.size(); i++) {
-      key = (Writable) ReflectionUtils.newInstance(keyClass, conf);
-      value = (Writable) ReflectionUtils.newInstance(valueClass, conf);
+      convertedKey = (WritableComparable) ReflectionUtils.newInstance(
+          convertedKeyClass, conf);
+      value = new MapWritable();
 
-      readers.get(i).next(key, value);
-      KeyValuePair record = new KeyValuePair(key, value);
-      outputPair = converter.convertRecord(record, conf);
-      sortedMap.put((WritableComparable) outputPair.getKey(), new KeyValuePair(
-          new IntWritable(i), record));
+      readers.get(i).next(convertedKey, value);
+      comparisonMap.put(convertedKey, new KeyValuePair(i, value));
     }
 
     while (readers.size() > 0) {
-      key = (Writable) ReflectionUtils.newInstance(keyClass, conf);
-      value = (Writable) ReflectionUtils.newInstance(valueClass, conf);
+      convertedKey = (WritableComparable) ReflectionUtils.newInstance(
+          convertedKeyClass, conf);
+      value = new MapWritable();
+
+      WritableComparable firstKey = comparisonMap.firstKey();
+      KeyValuePair kv = comparisonMap.get(firstKey);
+
+      int readerIndex = (Integer) kv.getKey();
+      MapWritable rawRecord = (MapWritable) kv.getValue();
+
+      for (Map.Entry<Writable, Writable> e : rawRecord.entrySet()) {
+        writer.append(e.getKey(), e.getValue());
+      }
 
-      WritableComparable firstKey = sortedMap.firstKey();
-      KeyValuePair kv = sortedMap.get(firstKey);
-      int readerIndex = ((IntWritable) kv.getKey()).get();
-      KeyValuePair rawRecord = (KeyValuePair) kv.getValue();
-      writer.append(rawRecord.getKey(), rawRecord.getValue());
-
-      sortedMap.remove(firstKey);
-
-      if (readers.get(readerIndex).next(key, value)) {
-        KeyValuePair record = new KeyValuePair(key, value);
-        outputPair = converter.convertRecord(record, conf);
-        sortedMap.put((WritableComparable) outputPair.getKey(),
-            new KeyValuePair(new IntWritable(readerIndex), record));
+      comparisonMap.remove(firstKey);
+
+      if (readers.get(readerIndex).next(convertedKey, value)) {
+        comparisonMap.put(convertedKey, new KeyValuePair(readerIndex, value));
       } else {
         readers.get(readerIndex).close();
         readers.remove(readerIndex);
       }
     }
 
-    sortedMap.clear();
+    comparisonMap.clear();
     writer.close();
+
+    deleteSlices(status);
   }
 
   @SuppressWarnings({ "rawtypes", "unchecked" })
   private void mergeFiles(FileStatus[] status, Path destinationFilePath,
-      Class keyClass, Class valueClass) throws IOException {
+      Class convertedKeyClass, Class rawKeyClass, Class rawValueClass)
+      throws IOException {
     SequenceFile.Writer writer = SequenceFile.createWriter(fs, conf,
-        destinationFilePath, keyClass, valueClass, CompressionType.NONE);
+        destinationFilePath, rawKeyClass, rawValueClass, CompressionType.NONE);
     Writable key;
-    Writable value;
+    MapWritable rawRecord;
 
     for (int i = 0; i < status.length; i++) {
       SequenceFile.Reader reader = new SequenceFile.Reader(fs,
           status[i].getPath(), conf);
-      key = (Writable) ReflectionUtils.newInstance(keyClass, conf);
-      value = (Writable) ReflectionUtils.newInstance(valueClass, conf);
+      key = (Writable) ReflectionUtils.newInstance(convertedKeyClass, conf);
+      rawRecord = new MapWritable();
 
-      while (reader.next(key, value)) {
-        writer.append(key, value);
+      while (reader.next(key, rawRecord)) {
+        for (Map.Entry<Writable, Writable> e : rawRecord.entrySet()) {
+          writer.append(e.getKey(), e.getValue());
+        }
       }
       reader.close();
     }
     writer.close();
+
+    deleteSlices(status);
+  }
+
+  private void deleteSlices(FileStatus[] status) throws IOException {
+    for (int i = 0; i < status.length; i++) {
+      fs.delete(status[i].getPath(), true);
+      if (fs.exists(status[i].getPath().suffix(".sorted")))
+        fs.delete(status[i].getPath().suffix(".sorted"), true);
+    }
   }
 
   @Override