You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@hama.apache.org by "Garuda Kang (JIRA)" <ji...@apache.org> on 2014/02/28 05:02:19 UTC
[jira] [Commented] (HAMA-878) PartitioningRunner.Sorter doesn't
allows duplicate key
[ https://issues.apache.org/jira/browse/HAMA-878?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=13915436#comment-13915436 ]
Garuda Kang commented on HAMA-878:
----------------------------------
i made sample code
{code}
public void Sort(String[] pPath, String pPathResult) {
Configuration conf = new Configuration();
FileSystem fs;
try {
fs = FileSystem.getLocal(conf);
Path seqFilePath = new Path(pPathResult);
SequenceFile.Writer writer = SequenceFile.createWriter(fs, conf, seqFilePath, k.getClass(), v.getClass(), CompressionType.NONE);
Map<Integer, SequenceFile.Reader> readers = new HashMap<Integer, SequenceFile.Reader>();
for(int a=0;a<pPath.length;a++) {
SequenceFile.Sorter sorter = new SequenceFile.Sorter(fs, wc.getClass(), v.getClass(), conf);
sorter.setMemory(50*1024*1024);
sorter.setFactor(10);
Path inFile = new Path(pPath[a]);
Path outFile = new Path(pPath[a].concat(".sorted"));
if(fs.exists(outFile)) {
fs.delete(outFile, true);
}
sorter.sort(inFile, outFile);
readers.put(a, new SequenceFile.Reader(fs, outFile, conf));
}
WritableComparable KeyLow = (WritableComparable)ReflectionUtils.newInstance(wc.getClass(), conf);
Writable ValueLow = (Writable)ReflectionUtils.newInstance(v.getClass(), conf);
int IndexLow = 0;
// instead comparisonMap
Map<Integer,Map<WritableComparable,Writable>> ValsMap = new HashMap<Integer,Map<WritableComparable,Writable>>();
// set default value (firt record from sorted files)
for(int a=0;a<readers.size();a++) {
WritableComparable KeyNew = (WritableComparable)ReflectionUtils.newInstance(wc.getClass(), conf);
Writable ValueNew = (Writable)ReflectionUtils.newInstance(v.getClass(), conf);
readers.get(a).next(KeyNew, ValueNew);
Map<WritableComparable,Writable> innerMap = new HashMap<WritableComparable,Writable>();
innerMap.put(KeyNew, ValueNew);
ValsMap.put(a, innerMap);
if(a == 0) {
KeyLow = KeyNew;
ValueLow = ValueNew;
}
}
int cnt = 0;
while(readers.size() > 0) {
for(Map.Entry<Integer,Map<WritableComparable,Writable>> e1 : ValsMap.entrySet()) {
for(Map.Entry<WritableComparable, Writable> e2 : e1.getValue().entrySet()) {
// compare low value & set low value
if(e2.getKey().compareTo(KeyLow) <= 0) {
KeyLow = e2.getKey();
ValueLow = e2.getValue();
IndexLow = e1.getKey();
}
}
}
writer.append(KeyLow, ValueLow);
ValsMap.remove(IndexLow);
WritableComparable KeyNew = (WritableComparable)ReflectionUtils.newInstance(wc.getClass(), conf);
Writable ValueNew = (Writable)ReflectionUtils.newInstance(v.getClass(), conf);
if(readers.containsKey(IndexLow)) {
// get New value
if(readers.get(IndexLow).next(KeyNew, ValueNew)) {
Map<WritableComparable,Writable> innerMap = new HashMap<WritableComparable,Writable>();
innerMap.put(KeyNew, ValueNew);
ValsMap.put(IndexLow, innerMap);
KeyLow = KeyNew;
ValueLow = ValueNew;
}else{
readers.get(IndexLow).close();
readers.remove(IndexLow);
}
}
}
writer.close();
// confirm values
/*
SequenceFile.Reader resultReader = new SequenceFile.Reader(fs, seqFilePath, conf);
WritableComparable KeyNew = (WritableComparable)ReflectionUtils.newInstance(wc.getClass(), conf);
Writable ValueNew = (Writable)ReflectionUtils.newInstance(v.getClass(), conf);
while(resultReader.next(KeyNew, ValueNew)){
System.out.println(String.format("result [%s]\t[%s]", KeyNew, ValueNew));
}
*/
} catch (IOException e) {
e.printStackTrace();
}
}
{code}
> PartitioningRunner.Sorter doesn't allows duplicate key
> ------------------------------------------------------
>
> Key: HAMA-878
> URL: https://issues.apache.org/jira/browse/HAMA-878
> Project: Hama
> Issue Type: Bug
> Affects Versions: 0.7.0
> Reporter: Edward J. Yoon
> Assignee: Edward J. Yoon
> Priority: Critical
> Fix For: 0.7.0
>
>
> My mistake. I used SortedMap to compare keys when merge sort. So, some records contains duplicate key can be ignored.
> public SortedMap<WritableComparable, KeyValuePair<Integer, KeyValuePair>> comparisonMap = new TreeMap<WritableComparable, KeyValuePair<Integer, KeyValuePair>>();
--
This message was sent by Atlassian JIRA
(v6.1.5#6160)