You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@spark.apache.org by Steve Lewis <lo...@gmail.com> on 2014/09/13 07:32:05 UTC

Looking for a good sample of Using Spark to do things Hadoop can do

Assume I have a large book with many Chapters and many lines of text.
 Assume I have a function that tells me the similarity of two lines of
text. The objective is to find the most similar line in the same chapter
within 200 lines of the line found.
The real problem involves biology and is beyond this discussion.

In the code shown below I convert Lines with location into a Tuple2 where
location is the key,

Now I want to partition by chapter (I think maybe that is right)

Now for every chapter I want to look at lines in order of location
 I want to keep the last 200 locations (as LineAndLocationMatch ) search
them to update the best fit and for every line add a best fit. When a line
is over 200 away from the current line it can be added ti the return
JavaRDD.

I know how to to the map and generate doubles but not how to do the sort
and reduce or even what the reduce function arguments look like.

Please use Java functions - not Lambdas as a sample- I am a strong typing
guy - returning JavaRDDs show me the type for a series of . operations and
really helps me understand what is happening

I expect my reduceFunction to look like
 void reduceFunction(KeyClass key,Iterator<LineAndLocation> values) but to
have some way to
accept the best fit LineAndLocationMatch  generated as values are iterated.
There is no reason to think that the number of objects will fit in memory.

Also it is important for the function doing the reduce to know the key.

I am very lost at what the reduce look like. Under the covers reduce
involves a lot of Java code which knows very little about spark and Hadoop.

My pseudo code looke like this - as far as I have working

    // one line in the book
    static class LineAndLocation  {
         int chapter;
         int lineNumber;
         String line;
    }

    // one line in the book
    static class LineAndLocationMatch {
        LineAndLocationMatch thisLine;
        LineAndLocationMatch bestFit;
    }

    // location - acts as a key
    static class KeyClass {
         int chapter;
         int lineNumber;

        KeyClass(final int pChapter, final int pLineNumber) {
            chapter = pChapter;
            lineNumber = pLineNumber;
        }
    }

    // used to compute the best fit
    public class SimilarityFunction {
        double getSimilarity(String s1,String s2)  {
            return 0; // todo do work here
        }
    }

// This functions returns a RDD with best macth objects
    public static JavaRDD<LineAndLocationMatch>
 findBestMatchesLikeHadoop(JavaRDD<LineAndLocation> inputs) {

        // So this is what the mapper does - make key value pairs
        JavaPairRDD<KeyClass , LineAndLocation > mappedKeys =
inputs.mapToPair(new PairFunction<LineAndLocation, KeyClass,
LineAndLocation>() {

                   @Override public Tuple2<KeyClass , LineAndLocation >
call(final LineAndLocation  v) throws Exception {
                       return new Tuple2(new
KeyClass(v.chapter,v.lineNumber),v);
                   }
               });

        // Partition by chapters ?? is this right??
        mappedKeys = mappedKeys.partitionBy(new Partitioner() {
            @Override public int numPartitions() {
                return 20;
            }

            @Override public int getPartition(final Object key) {
                return ((KeyClass)key).chapter % numPartitions();
            }
        });

        // Now I get very fuzzy - I for every partition I want sort on line
number
        JavaPairRDD<KeyClass , LineAndLocation > sortedKeys = ??? WHAT
HAPPENS HERE

        // Now I need to to a reduce operation What I want is
        JavaRDD<LineAndLocationMatch> bestMatches = sortedKeys.<SOME
FUNCTION>();

        return bestMatches;
    }