You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hama.apache.org by ed...@apache.org on 2014/02/04 06:44:26 UTC
svn commit: r1564208 - in /hama/trunk: conf/hama-default.xml
core/src/main/java/org/apache/hama/bsp/PartitioningRunner.java
Author: edwardyoon
Date: Tue Feb 4 05:44:26 2014
New Revision: 1564208
URL: http://svn.apache.org/r1564208
Log:
HAMA-865: Use SequenceFile.Sorter for partitioningRunner, instead of memory-based sorted map.
Modified:
hama/trunk/conf/hama-default.xml
hama/trunk/core/src/main/java/org/apache/hama/bsp/PartitioningRunner.java
Modified: hama/trunk/conf/hama-default.xml
URL: http://svn.apache.org/viewvc/hama/trunk/conf/hama-default.xml?rev=1564208&r1=1564207&r2=1564208&view=diff
==============================================================================
--- hama/trunk/conf/hama-default.xml (original)
+++ hama/trunk/conf/hama-default.xml Tue Feb 4 05:44:26 2014
@@ -172,6 +172,19 @@
</description>
</property>
<property>
+ <name>bsp.input.runtime.partitioning.sort.mb</name>
+ <value>50</value>
+ <description>The total amount of buffer memory in MB.
+ </description>
+ </property>
+ <property>
+ <name>bsp.input.runtime.partitioning.sort.factor</name>
+ <value>10</value>
+ <description>The maximum number of streams to merge at once; the default is 10.
+ </description>
+ </property>
+
+ <property>
<name>io.serializations</name>
<value>org.apache.hadoop.io.serializer.WritableSerialization,org.apache.hadoop.io.serializer.JavaSerialization</value>
<description>The default IO serialization protocol for HDFS I/O</description>
Modified: hama/trunk/core/src/main/java/org/apache/hama/bsp/PartitioningRunner.java
URL: http://svn.apache.org/viewvc/hama/trunk/core/src/main/java/org/apache/hama/bsp/PartitioningRunner.java?rev=1564208&r1=1564207&r2=1564208&view=diff
==============================================================================
--- hama/trunk/core/src/main/java/org/apache/hama/bsp/PartitioningRunner.java (original)
+++ hama/trunk/core/src/main/java/org/apache/hama/bsp/PartitioningRunner.java Tue Feb 4 05:44:26 2014
@@ -21,7 +21,6 @@ import java.io.IOException;
import java.lang.reflect.Constructor;
import java.util.HashMap;
import java.util.Map;
-import java.util.Map.Entry;
import java.util.SortedMap;
import java.util.TreeMap;
@@ -31,7 +30,7 @@ import org.apache.hadoop.conf.Configurat
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.io.IntWritable;
+import org.apache.hadoop.io.MapWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.SequenceFile;
import org.apache.hadoop.io.SequenceFile.CompressionType;
@@ -134,58 +133,51 @@ public class PartitioningRunner extends
public Map<Integer, SequenceFile.Writer> writerCache = new HashMap<Integer, SequenceFile.Writer>();
- @SuppressWarnings("rawtypes")
- public SortedMap<WritableComparable, KeyValuePair<IntWritable, KeyValuePair>> sortedMap = new TreeMap<WritableComparable, KeyValuePair<IntWritable, KeyValuePair>>();
-
@Override
- @SuppressWarnings({ "rawtypes", "unchecked" })
+ @SuppressWarnings({ "rawtypes" })
public void bsp(
BSPPeer<Writable, Writable, Writable, Writable, NullWritable> peer)
throws IOException, SyncException, InterruptedException {
int peerNum = peer.getNumPeers();
Partitioner partitioner = getPartitioner();
- KeyValuePair<Writable, Writable> pair = null;
- KeyValuePair<Writable, Writable> outputPair = null;
+ KeyValuePair<Writable, Writable> rawRecord = null;
+ KeyValuePair<Writable, Writable> convertedRecord = null;
- Class keyClass = null;
- Class valueClass = null;
- while ((pair = peer.readNext()) != null) {
- if (keyClass == null && valueClass == null) {
- keyClass = pair.getKey().getClass();
- valueClass = pair.getValue().getClass();
+ Class convertedKeyClass = null;
+ Class rawKeyClass = null;
+ Class rawValueClass = null;
+ MapWritable raw = null;
+
+ while ((rawRecord = peer.readNext()) != null) {
+ if (rawKeyClass == null && rawValueClass == null) {
+ rawKeyClass = rawRecord.getKey().getClass();
+ rawValueClass = rawRecord.getValue().getClass();
}
- outputPair = converter.convertRecord(pair, conf);
+ convertedRecord = converter.convertRecord(rawRecord, conf);
- if (outputPair == null) {
+ if (convertedRecord == null) {
continue;
}
+ Writable convertedKey = convertedRecord.getKey();
+ convertedKeyClass = convertedKey.getClass();
- int index = converter.getPartitionId(outputPair, partitioner, conf, peer,
- desiredNum);
-
- // if key is comparable and it need to be sorted by key,
- if (outputPair.getKey() instanceof WritableComparable
- && conf.getBoolean(Constants.PARTITION_SORT_BY_KEY, false)) {
- sortedMap.put(
- (WritableComparable) outputPair.getKey(),
- new KeyValuePair(new IntWritable(index), new KeyValuePair(pair
- .getKey(), pair.getValue())));
- } else {
- if (!writerCache.containsKey(index)) {
- Path destFile = new Path(partitionDir + "/part-" + index + "/file-"
- + peer.getPeerIndex());
- SequenceFile.Writer writer = SequenceFile.createWriter(fs, conf,
- destFile, keyClass, valueClass, CompressionType.NONE);
- writerCache.put(index, writer);
- }
+ int index = converter.getPartitionId(convertedRecord, partitioner, conf,
+ peer, desiredNum);
- writerCache.get(index).append(pair.getKey(), pair.getValue());
+ if (!writerCache.containsKey(index)) {
+ Path destFile = new Path(partitionDir + "/part-" + index + "/file-"
+ + peer.getPeerIndex());
+ SequenceFile.Writer writer = SequenceFile.createWriter(fs, conf,
+ destFile, convertedKeyClass, MapWritable.class,
+ CompressionType.NONE);
+ writerCache.put(index, writer);
}
- }
- if (sortedMap.size() > 0) {
- writeSortedFile(peer.getPeerIndex(), keyClass, valueClass);
+ raw = new MapWritable();
+ raw.put(rawRecord.getKey(), rawRecord.getValue());
+
+ writerCache.get(index).append(convertedKey, raw);
}
for (SequenceFile.Writer w : writerCache.values()) {
@@ -206,11 +198,13 @@ public class PartitioningRunner extends
+ getPartitionName(partitionID));
FileStatus[] files = fs.listStatus(stat.getPath());
- if (outputPair.getKey() instanceof WritableComparable
+ if (convertedRecord.getKey() instanceof WritableComparable
&& conf.getBoolean(Constants.PARTITION_SORT_BY_KEY, false)) {
- mergeSortedFiles(files, destinationFilePath, keyClass, valueClass);
+ mergeSortedFiles(files, destinationFilePath, convertedKeyClass,
+ rawKeyClass, rawValueClass);
} else {
- mergeFiles(files, destinationFilePath, keyClass, valueClass);
+ mergeFiles(files, destinationFilePath, convertedKeyClass,
+ rawKeyClass, rawValueClass);
}
fs.delete(stat.getPath(), true);
}
@@ -218,99 +212,105 @@ public class PartitioningRunner extends
}
@SuppressWarnings("rawtypes")
- private void writeSortedFile(int peerIndex, Class keyClass, Class valueClass)
- throws IOException {
- for (Entry<WritableComparable, KeyValuePair<IntWritable, KeyValuePair>> e : sortedMap
- .entrySet()) {
- int index = ((IntWritable) e.getValue().getKey()).get();
- KeyValuePair rawRecord = e.getValue().getValue();
-
- if (!writerCache.containsKey(index)) {
- Path destFile = new Path(partitionDir + "/part-" + index + "/file-"
- + peerIndex);
- SequenceFile.Writer writer = SequenceFile.createWriter(fs, conf,
- destFile, keyClass, valueClass, CompressionType.NONE);
- writerCache.put(index, writer);
- }
-
- writerCache.get(index).append(rawRecord.getKey(), rawRecord.getValue());
- }
-
- sortedMap.clear();
- }
+ public SortedMap<WritableComparable, KeyValuePair<Integer, KeyValuePair>> comparisonMap = new TreeMap<WritableComparable, KeyValuePair<Integer, KeyValuePair>>();
@SuppressWarnings({ "rawtypes", "unchecked" })
private void mergeSortedFiles(FileStatus[] status, Path destinationFilePath,
- Class keyClass, Class valueClass) throws IOException {
+ Class convertedKeyClass, Class rawKeyClass, Class rawValueClass)
+ throws IOException {
SequenceFile.Writer writer = SequenceFile.createWriter(fs, conf,
- destinationFilePath, keyClass, valueClass, CompressionType.NONE);
- KeyValuePair outputPair = null;
- Writable key;
- Writable value;
+ destinationFilePath, rawKeyClass, rawValueClass, CompressionType.NONE);
+ WritableComparable convertedKey;
+ MapWritable value;
Map<Integer, SequenceFile.Reader> readers = new HashMap<Integer, SequenceFile.Reader>();
for (int i = 0; i < status.length; i++) {
- readers.put(i, new SequenceFile.Reader(fs, status[i].getPath(), conf));
+ SequenceFile.Sorter sorter = new SequenceFile.Sorter(fs,
+ convertedKeyClass, MapWritable.class, conf);
+ sorter.setMemory(conf
+ .getInt("bsp.input.runtime.partitioning.sort.mb", 50) * 1024 * 1024);
+ sorter.setFactor(conf.getInt(
+ "bsp.input.runtime.partitioning.sort.factor", 10));
+ sorter.sort(status[i].getPath(), status[i].getPath().suffix(".sorted"));
+
+ readers.put(i,
+ new SequenceFile.Reader(fs, status[i].getPath().suffix(".sorted"),
+ conf));
}
for (int i = 0; i < readers.size(); i++) {
- key = (Writable) ReflectionUtils.newInstance(keyClass, conf);
- value = (Writable) ReflectionUtils.newInstance(valueClass, conf);
+ convertedKey = (WritableComparable) ReflectionUtils.newInstance(
+ convertedKeyClass, conf);
+ value = new MapWritable();
- readers.get(i).next(key, value);
- KeyValuePair record = new KeyValuePair(key, value);
- outputPair = converter.convertRecord(record, conf);
- sortedMap.put((WritableComparable) outputPair.getKey(), new KeyValuePair(
- new IntWritable(i), record));
+ readers.get(i).next(convertedKey, value);
+ comparisonMap.put(convertedKey, new KeyValuePair(i, value));
}
while (readers.size() > 0) {
- key = (Writable) ReflectionUtils.newInstance(keyClass, conf);
- value = (Writable) ReflectionUtils.newInstance(valueClass, conf);
+ convertedKey = (WritableComparable) ReflectionUtils.newInstance(
+ convertedKeyClass, conf);
+ value = new MapWritable();
+
+ WritableComparable firstKey = comparisonMap.firstKey();
+ KeyValuePair kv = comparisonMap.get(firstKey);
+
+ int readerIndex = (Integer) kv.getKey();
+ MapWritable rawRecord = (MapWritable) kv.getValue();
+
+ for (Map.Entry<Writable, Writable> e : rawRecord.entrySet()) {
+ writer.append(e.getKey(), e.getValue());
+ }
- WritableComparable firstKey = sortedMap.firstKey();
- KeyValuePair kv = sortedMap.get(firstKey);
- int readerIndex = ((IntWritable) kv.getKey()).get();
- KeyValuePair rawRecord = (KeyValuePair) kv.getValue();
- writer.append(rawRecord.getKey(), rawRecord.getValue());
-
- sortedMap.remove(firstKey);
-
- if (readers.get(readerIndex).next(key, value)) {
- KeyValuePair record = new KeyValuePair(key, value);
- outputPair = converter.convertRecord(record, conf);
- sortedMap.put((WritableComparable) outputPair.getKey(),
- new KeyValuePair(new IntWritable(readerIndex), record));
+ comparisonMap.remove(firstKey);
+
+ if (readers.get(readerIndex).next(convertedKey, value)) {
+ comparisonMap.put(convertedKey, new KeyValuePair(readerIndex, value));
} else {
readers.get(readerIndex).close();
readers.remove(readerIndex);
}
}
- sortedMap.clear();
+ comparisonMap.clear();
writer.close();
+
+ deleteSlices(status);
}
@SuppressWarnings({ "rawtypes", "unchecked" })
private void mergeFiles(FileStatus[] status, Path destinationFilePath,
- Class keyClass, Class valueClass) throws IOException {
+ Class convertedKeyClass, Class rawKeyClass, Class rawValueClass)
+ throws IOException {
SequenceFile.Writer writer = SequenceFile.createWriter(fs, conf,
- destinationFilePath, keyClass, valueClass, CompressionType.NONE);
+ destinationFilePath, rawKeyClass, rawValueClass, CompressionType.NONE);
Writable key;
- Writable value;
+ MapWritable rawRecord;
for (int i = 0; i < status.length; i++) {
SequenceFile.Reader reader = new SequenceFile.Reader(fs,
status[i].getPath(), conf);
- key = (Writable) ReflectionUtils.newInstance(keyClass, conf);
- value = (Writable) ReflectionUtils.newInstance(valueClass, conf);
+ key = (Writable) ReflectionUtils.newInstance(convertedKeyClass, conf);
+ rawRecord = new MapWritable();
- while (reader.next(key, value)) {
- writer.append(key, value);
+ while (reader.next(key, rawRecord)) {
+ for (Map.Entry<Writable, Writable> e : rawRecord.entrySet()) {
+ writer.append(e.getKey(), e.getValue());
+ }
}
reader.close();
}
writer.close();
+
+ deleteSlices(status);
+ }
+
+ private void deleteSlices(FileStatus[] status) throws IOException {
+ for (int i = 0; i < status.length; i++) {
+ fs.delete(status[i].getPath(), true);
+ if (fs.exists(status[i].getPath().suffix(".sorted")))
+ fs.delete(status[i].getPath().suffix(".sorted"), true);
+ }
}
@Override