You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@spark.apache.org by "Devi P.V" <de...@gmail.com> on 2015/07/29 15:38:15 UTC

Count of distinct values in each column

Hi All,

I have a 5GB CSV dataset having 69 columns..I need to find the count of
distinct values in each column. What is the optimized way to find the same
using spark scala?

Example CSV format :

a,b,c,d
a,c,b,a
b,b,c,d
b,b,c,a
c,b,b,a

Output expecting :

(a,2),(b,2),(c,1) #- First column distinct count
(b,4),(c,1)       #- Second column distinct count
(c,3),(b,2)       #- Third column distinct count
(d,2),(a,3)       #- Fourth column distinct count


Thanks in Advance

Re: Count of distinct values in each column

Posted by Alexander Krasheninnikov <a....@corp.badoo.com>.
I made such naive implementation:

SparkConf conf =newSparkConf();
conf.setMaster("local[4]").setAppName("Stub");
finalJavaSparkContext ctx =newJavaSparkContext(conf);

JavaRDD<String> input = ctx.textFile("path_to_file");

// explode each line into list of column values
JavaRDD<List<String>> rowValues = input.map(newFunction<String, List<String>>() {
     @Override
     publicList<String> call(String v1)throwsException {
         returnLists.newArrayList(v1.split(";"));
     }
});

// transform input to key(word, colNumber), value => 1
JavaPairRDD<Tuple2<String, Integer>, Integer> positions = rowValues.flatMapToPair(newPairFlatMapFunction<List<String>, Tuple2<String, Integer>, Integer>() {
     @Override
     publicIterable<Tuple2<Tuple2<String, Integer>, Integer>> call(List<String> strings)throwsException {
         List<Tuple2<Tuple2<String, Integer>, Integer>> retval =newArrayList<>();

         intcolNum = -1;
         for(String word : strings) {
             Tuple2<String, Integer> wordPosition =newTuple2<>(word, ++colNum);
             retval.add(newTuple2<>(wordPosition,1));
         }
         returnretval;
     }
});

// summ word counts within column
JavaPairRDD<Tuple2<String, Integer>, Integer> summ = positions.reduceByKey(newFunction2<Integer, Integer, Integer>() {
     @Override
     publicInteger call(Integer v1, Integer v2)throwsException {
         returnv1 + v2;
     }
});

// invert position - make columnNumber key, and word+count - key
JavaPairRDD<Integer, Tuple2<String, Integer>> columnIsKey = summ.mapToPair(newPairFunction<Tuple2<Tuple2<String, Integer>, Integer>, Integer, Tuple2<String, Integer>>() {
     @Override
     publicTuple2<Integer, Tuple2<String, Integer>> call(Tuple2<Tuple2<String, Integer>, Integer> tuple2IntegerTuple2)throwsException {
         return newTuple2<>(tuple2IntegerTuple2._1()._2(),newTuple2<>(tuple2IntegerTuple2._1()._1(), tuple2IntegerTuple2._2()));
     }
});

// here some optimizations to avoid groupByKey may be implemented
JavaPairRDD<Integer, Iterable<Tuple2<String, Integer>>> groupped = columnIsKey.groupByKey();
// output results
groupped.foreach(newVoidFunction<Tuple2<Integer, Iterable<Tuple2<String, Integer>>>>() {
     @Override
     public voidcall(Tuple2<Integer, Iterable<Tuple2<String, Integer>>> integerIterableTuple2)throwsException {
         String strValues ="";
         Iterable<Tuple2<String, Integer>> values = integerIterableTuple2._2();
         for(Tuple2<String,Integer> distinct : values) {
             strValues +="("+distinct._1()+","+ distinct._2() +")";
         }
         System.out.println("Column: "+ integerIterableTuple2._1());
         System.out.println("Distincts: "+ strValues);
     }
});



On 29.07.2015 16:38, Devi P.V wrote:
> Hi All,
>
> I have a 5GB CSV dataset having 69 columns..I need to find the count 
> of distinct values in each column. What is the optimized way to find 
> the same using spark scala?
>
> Example CSV format :
>
> a,b,c,d
> a,c,b,a
> b,b,c,d
> b,b,c,a
> c,b,b,a
>
> Output expecting :
>
> (a,2),(b,2),(c,1) #- First column distinct count
> (b,4),(c,1)       #- Second column distinct count
> (c,3),(b,2)       #- Third column distinct count
> (d,2),(a,3)       #- Fourth column distinct count
>
>
> Thanks in Advance