You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@spark.apache.org by "ruan.answer" <ru...@gmail.com> on 2016/11/14 07:26:28 UTC

handle data skew problem when calculating word count and word dependency

I am planning to calculating word count and two word dependency via spark,
but the data is skew, how can i solve this problem.
And do you have some suggest about double level data slice?

I have some topics, and each topic corresponding to lots of text. so I have
a RDD structure like this:
JavaPairRDD<String, Iterable&lt;String>> topicGroup
Then I want compute word count and word dependency to generate some text
pattern for each topic.
But unfortunately, some topic has a little data, such as 10k, some topic has
a large data, such as 5G, this imbalance drive me crazy.  So I have this
solution:

       List<String> topics = topicsGroup.keys().collect();
       LOG.warn("topics group key: " + topics.size());
       for (String key : topics) {
           JavaRDD<String> valuesRDD = getValueRDDByKey(topicsGroup, key);
           long lineCount = valuesRDD.count();
           int minNums = (int)10;
           List<String> lines = valuesRDD.collect();
           JavaRDD<String[]> agentWordsRDD = valuesRDD.map((Function<String,
String[]>) v1 -> splitAndParseLogLine(v1, logMiningOptions)).cache();

           Map<String, Integer> dict =
agentWordsRDD.flatMap((FlatMapFunction<String[], String>) strings ->
Arrays.asList(strings))
                   .mapToPair((PairFunction<String, String, Integer>) s ->
new Tuple2<>(s, 1))
                   .reduceByKey((Function2<Integer, Integer, Integer>) (v1,
v2) -> v1 + v2)
                   .filter((Function<Tuple2&lt;String, Integer>, Boolean>)
v1 -> {
               if (v1._2() < minNums || v1._1().length() < 1)
                   return true;
               return false;
           }).collectAsMap();

           JavaRDD<Tuple2&lt;List&lt;String>, List<Integer> > > tupleVarsRDD
= agentWordsRDD.map((Function<String[], Tuple2&lt;List&lt;String>,
List<Integer>>>) words -> {
               List<String> tuple = new ArrayList<String>();
               List<Integer> vars = new ArrayList<Integer>();
               int v = 0;
               for (String word : words) {
                   if (dict.containsKey(word)) {
                       tuple.add(word);
                       vars.add(v);
                       v = 0;
                   } else
                       ++v;
               }
               vars.add(v);
               return new Tuple2<>(tuple, vars);
           });
           List<Tuple2&lt;List&lt;String>, List<Integer> > > tupleVars =
tupleVarsRDD.collect();

           LogMining logMining = new LogMining(logMiningOptions);
           Tuple2<List&lt;Candidates>, Integer> patterns =  new
Tuple2<>(logMining.batchProcess(tupleVars, dict), logMining.getLineCount());
           putPatternsToDatabase(logFileDate, key, patterns);
          }
       }
But, you can see, I still don't solve the problem of data skew problem, and
for each key i will generate a RDD, it has a lot of redundant operator. So
my question is "do you have some good advice or example about how to handle
this problem" or "when data is skew, how can i cut data by length, not sort
by length, then cut avg like JavaRDD<String> valuesRDDPartition =
valuesRDD.sortBy(new Function<String, Integer>() {
                    @Override
                    public Integer call(String v1) throws Exception {
                        return
logMiningOptions.getSplitWord().splitWord(v1).length;
                    }
                }, true, 16);"  it is 16, or some other numbers, not by
length.



--
View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/handle-data-skew-problem-when-calculating-word-count-and-word-dependency-tp28068.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

---------------------------------------------------------------------
To unsubscribe e-mail: user-unsubscribe@spark.apache.org