You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@spark.apache.org by "ruan.answer" <ru...@gmail.com> on 2016/11/14 07:26:28 UTC
handle data skew problem when calculating word count and word
dependency
I am planning to calculating word count and two word dependency via spark,
but the data is skew, how can i solve this problem.
And do you have some suggest about double level data slice?
I have some topics, and each topic corresponding to lots of text. so I have
a RDD structure like this:
JavaPairRDD<String, Iterable<String>> topicGroup
Then I want compute word count and word dependency to generate some text
pattern for each topic.
But unfortunately, some topic has a little data, such as 10k, some topic has
a large data, such as 5G, this imbalance drive me crazy. So I have this
solution:
List<String> topics = topicsGroup.keys().collect();
LOG.warn("topics group key: " + topics.size());
for (String key : topics) {
JavaRDD<String> valuesRDD = getValueRDDByKey(topicsGroup, key);
long lineCount = valuesRDD.count();
int minNums = (int)10;
List<String> lines = valuesRDD.collect();
JavaRDD<String[]> agentWordsRDD = valuesRDD.map((Function<String,
String[]>) v1 -> splitAndParseLogLine(v1, logMiningOptions)).cache();
Map<String, Integer> dict =
agentWordsRDD.flatMap((FlatMapFunction<String[], String>) strings ->
Arrays.asList(strings))
.mapToPair((PairFunction<String, String, Integer>) s ->
new Tuple2<>(s, 1))
.reduceByKey((Function2<Integer, Integer, Integer>) (v1,
v2) -> v1 + v2)
.filter((Function<Tuple2<String, Integer>, Boolean>)
v1 -> {
if (v1._2() < minNums || v1._1().length() < 1)
return true;
return false;
}).collectAsMap();
JavaRDD<Tuple2<List<String>, List<Integer> > > tupleVarsRDD
= agentWordsRDD.map((Function<String[], Tuple2<List<String>,
List<Integer>>>) words -> {
List<String> tuple = new ArrayList<String>();
List<Integer> vars = new ArrayList<Integer>();
int v = 0;
for (String word : words) {
if (dict.containsKey(word)) {
tuple.add(word);
vars.add(v);
v = 0;
} else
++v;
}
vars.add(v);
return new Tuple2<>(tuple, vars);
});
List<Tuple2<List<String>, List<Integer> > > tupleVars =
tupleVarsRDD.collect();
LogMining logMining = new LogMining(logMiningOptions);
Tuple2<List<Candidates>, Integer> patterns = new
Tuple2<>(logMining.batchProcess(tupleVars, dict), logMining.getLineCount());
putPatternsToDatabase(logFileDate, key, patterns);
}
}
But, you can see, I still don't solve the problem of data skew problem, and
for each key i will generate a RDD, it has a lot of redundant operator. So
my question is "do you have some good advice or example about how to handle
this problem" or "when data is skew, how can i cut data by length, not sort
by length, then cut avg like JavaRDD<String> valuesRDDPartition =
valuesRDD.sortBy(new Function<String, Integer>() {
@Override
public Integer call(String v1) throws Exception {
return
logMiningOptions.getSplitWord().splitWord(v1).length;
}
}, true, 16);" it is 16, or some other numbers, not by
length.
--
View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/handle-data-skew-problem-when-calculating-word-count-and-word-dependency-tp28068.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.
---------------------------------------------------------------------
To unsubscribe e-mail: user-unsubscribe@spark.apache.org