You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@hama.apache.org by "Garuda Kang (JIRA)" <ji...@apache.org> on 2014/02/28 05:02:19 UTC

[jira] [Commented] (HAMA-878) PartitioningRunner.Sorter doesn't allows duplicate key

    [ https://issues.apache.org/jira/browse/HAMA-878?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=13915436#comment-13915436 ] 

Garuda Kang commented on HAMA-878:
----------------------------------

i made sample code
{code}
public void Sort(String[] pPath, String pPathResult) {
		Configuration conf = new Configuration();
		FileSystem fs;
		try {
			fs = FileSystem.getLocal(conf);
			Path seqFilePath = new Path(pPathResult);
			
			SequenceFile.Writer writer = SequenceFile.createWriter(fs, conf, seqFilePath, k.getClass(), v.getClass(), CompressionType.NONE);
			Map<Integer, SequenceFile.Reader> readers = new HashMap<Integer, SequenceFile.Reader>();
			
			
			for(int a=0;a<pPath.length;a++) {
				SequenceFile.Sorter sorter = new SequenceFile.Sorter(fs, wc.getClass(), v.getClass(), conf);
				sorter.setMemory(50*1024*1024);
				sorter.setFactor(10);
				Path inFile = new Path(pPath[a]);
				Path outFile = new Path(pPath[a].concat(".sorted"));
				if(fs.exists(outFile)) {
					fs.delete(outFile, true);
				}
				sorter.sort(inFile, outFile);
				readers.put(a, new SequenceFile.Reader(fs, outFile, conf));
			}
			
			WritableComparable KeyLow = (WritableComparable)ReflectionUtils.newInstance(wc.getClass(), conf);
			Writable ValueLow = (Writable)ReflectionUtils.newInstance(v.getClass(), conf);
			int IndexLow = 0;
			
			
			// instead comparisonMap
			Map<Integer,Map<WritableComparable,Writable>> ValsMap = new HashMap<Integer,Map<WritableComparable,Writable>>();
			
			// set default value (firt record from sorted files)
			for(int a=0;a<readers.size();a++) {
				WritableComparable KeyNew = (WritableComparable)ReflectionUtils.newInstance(wc.getClass(), conf);
				Writable ValueNew = (Writable)ReflectionUtils.newInstance(v.getClass(), conf);
				readers.get(a).next(KeyNew, ValueNew);
				Map<WritableComparable,Writable> innerMap = new HashMap<WritableComparable,Writable>();
				innerMap.put(KeyNew, ValueNew);
				ValsMap.put(a, innerMap);
				if(a == 0) {
					KeyLow = KeyNew;
					ValueLow = ValueNew;
				}
			}
			
			int cnt = 0;
			while(readers.size() > 0) {
				for(Map.Entry<Integer,Map<WritableComparable,Writable>> e1 : ValsMap.entrySet()) {
					for(Map.Entry<WritableComparable, Writable> e2 : e1.getValue().entrySet()) {
						// compare low value & set low value 
						if(e2.getKey().compareTo(KeyLow) <= 0) {
							KeyLow = e2.getKey();
							ValueLow = e2.getValue();
							IndexLow = e1.getKey();
						}
					}
				}
				writer.append(KeyLow, ValueLow);
				ValsMap.remove(IndexLow);
				
				WritableComparable KeyNew = (WritableComparable)ReflectionUtils.newInstance(wc.getClass(), conf);
				Writable ValueNew = (Writable)ReflectionUtils.newInstance(v.getClass(), conf);
				
				if(readers.containsKey(IndexLow)) {
					// get New value
					if(readers.get(IndexLow).next(KeyNew, ValueNew)) {
						Map<WritableComparable,Writable> innerMap = new HashMap<WritableComparable,Writable>();
						innerMap.put(KeyNew, ValueNew);
						ValsMap.put(IndexLow, innerMap);
						KeyLow = KeyNew;
						ValueLow = ValueNew;
					}else{
						readers.get(IndexLow).close();
						readers.remove(IndexLow);
					}
				}
			}
			writer.close();
			
			// confirm values
			/*
			SequenceFile.Reader resultReader = new SequenceFile.Reader(fs, seqFilePath, conf);
			WritableComparable KeyNew = (WritableComparable)ReflectionUtils.newInstance(wc.getClass(), conf);
			Writable ValueNew = (Writable)ReflectionUtils.newInstance(v.getClass(), conf);
			while(resultReader.next(KeyNew, ValueNew)){
				System.out.println(String.format("result [%s]\t[%s]", KeyNew, ValueNew));
			}
			*/
			
		} catch (IOException e) {
			e.printStackTrace();
		}
	}
{code}

> PartitioningRunner.Sorter doesn't allows duplicate key
> ------------------------------------------------------
>
>                 Key: HAMA-878
>                 URL: https://issues.apache.org/jira/browse/HAMA-878
>             Project: Hama
>          Issue Type: Bug
>    Affects Versions: 0.7.0
>            Reporter: Edward J. Yoon
>            Assignee: Edward J. Yoon
>            Priority: Critical
>             Fix For: 0.7.0
>
>
> My mistake. I used SortedMap to compare keys when merge sort. So, some records contains duplicate key can be ignored.
>   public SortedMap<WritableComparable, KeyValuePair<Integer, KeyValuePair>> comparisonMap = new TreeMap<WritableComparable, KeyValuePair<Integer, KeyValuePair>>();



--
This message was sent by Atlassian JIRA
(v6.1.5#6160)