You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@phoenix.apache.org by "Quan cheng (JIRA)" <ji...@apache.org> on 2015/04/15 16:11:58 UTC
[jira] [Commented] (PHOENIX-1868) follow phoenix map/reduce sample
code encounter cast error
[ https://issues.apache.org/jira/browse/PHOENIX-1868?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14496215#comment-14496215 ]
Quan cheng commented on PHOENIX-1868:
-------------------------------------
Further findings shows that reduce job will validate reduce parameters only with NullWritable and DBWritable
So the sample code for the reduce method
http://phoenix.apache.org/phoenix_mr.html
public static class StockReducer extends Reducer<Text, DoubleWritable, NullWritable , StockWritable> {
@Override
protected void reduce(Text key, Iterable<DoubleWritable> recordings, Context context) throws IOException, InterruptedException {
...}
either Text key (not NullWritable ) or Iterable<DoubleWritable> recordings (not DBWritable) will pass the validation.
So I doubt the sample code is even working for this case.
> follow phoenix map/reduce sample code encounter cast error
> ----------------------------------------------------------
>
> Key: PHOENIX-1868
> URL: https://issues.apache.org/jira/browse/PHOENIX-1868
> Project: Phoenix
> Issue Type: Test
> Affects Versions: 4.2.0
> Environment: hadoop-2.5.2/ hbase-0.98.8-hadoop2 / phoenix 4.3.1
> Reporter: Quan cheng
> Labels: easyfix
> Fix For: 4.2.0
>
>
> I followed the map/reduce sample code, develop my map/reduce code, and I always encounter Text cast error:
> 5/04/15 16:57:43 WARN mapred.LocalJobRunner: job_local1114443330_0001
> java.lang.ClassCastException: org.apache.hadoop.io.Text cannot be cast to org.apache.hadoop.io.NullWritable
> at org.apache.phoenix.mapreduce.PhoenixRecordWriter.write(PhoenixRecordWriter.java:39)
> at org.apache.hadoop.mapred.ReduceTask$NewTrackingRecordWriter.write(ReduceTask.java:576)
> at org.apache.hadoop.mapreduce.task.TaskInputOutputContextImpl.write(TaskInputOutputContextImpl.java:89)
> at org.apache.hadoop.mapreduce.lib.reduce.WrappedReducer$Context.write(WrappedReducer.java:105)
> at org.apache.hadoop.mapreduce.Reducer.reduce(Reducer.java:150)
> at org.apache.hadoop.mapreduce.Reducer.run(Reducer.java:171)
> at org.apache.hadoop.mapred.ReduceTask.runNewReducer(ReduceTask.java:645)
> at org.apache.hadoop.mapred.ReduceTask.run(ReduceTask.java:405)
> at org.apache.hadoop.mapred.LocalJobRunner$Job.run(LocalJobRunner.java:445)
> 15/04/15 16:57:44 INFO mapreduce.Job: map 100% reduce 0%
> 15/04/15 16:57:44 INFO mapreduce.Job: Job job_local1114443330_0001 failed with state FAILED due to: NA
> Here is my code:
> public class PMRAnalyzer {
>
> public static class PMRAnalyzerReducer extends Reducer<Text ,BytesWritable, NullWritable, PMRAnalyzResultWritable>{
>
> protected void reduce(Text key, BytesWritable findings, Context context)throws IOException, InterruptedException {
>
> Text pmr_token_text = new Text();
>
> String pmr_token = key.toString();
> String findings_jsonstr = new String(findings.copyBytes());
>
> PMRAnalyzResultWritable result = new PMRAnalyzResultWritable();
>
> JSONUtil util = new JSONUtil();
> PMRAnalyzeResult resultvo = new PMRAnalyzeResult();
> try{
> resultvo = (PMRAnalyzeResult)util.readJsonToVO(findings_jsonstr,PMRAnalyzeResult.class);
>
> result.setProblem_description(new String(Base64.decode(resultvo.getProblem_description())));
> }catch(Exception e){e.printStackTrace();}
>
> result.setFindings(findings_jsonstr);
> //NullWritable nullwritable = NullWritable.get();
> //System.out.println("findings_jsonstr:"+findings_jsonstr);
> context.write(NullWritable.get(), result);
>
>
> }
>
>
> }
> public static class PMRAnalyzerMapper extends
> Mapper<NullWritable, PMRAnalyzResultWritable, Text, BytesWritable> {
>
> protected void map(NullWritable key,
> PMRAnalyzResultWritable analyzeResultWritable, Context context)
> throws IOException, InterruptedException {
> final String the_pmr_token = analyzeResultWritable.getPMRNO() + "_"
> + analyzeResultWritable.getCREATED_TIME();
> final String pmr_text = analyzeResultWritable.getPMR_TEXT();
>
> Text pmr_token = new Text();
> Text findings = new Text();
>
> PMRAnalyzResultWritable resultwritable = new PMRAnalyzResultWritable();
> pmr_token.set(the_pmr_token);
> //findings.set(findings.toString());
> PMRParser parser = new PMRParser();
> String jsonstr = parser.parsePMRText(pmr_text, parser);
> findings.set(jsonstr);
> context.write(pmr_token, new BytesWritable(jsonstr.getBytes()));
>
> }
>
>
> }
>
> public static void main(String[] args) {
> // TODO Auto-generated method stub
> try{
> final Configuration configuration = HBaseConfiguration.create();
> final Job job = Job.getInstance(configuration, "phoenix-mr-job");
> // We can either specify a selectQuery or ignore it when we would like to retrieve all the columns
> final String selectQuery = "SELECT PMRNO,CREATED_TIME,PMR_TEXT FROM PMRTEXT ";
> // StockWritable is the DBWritable class that enables us to process the Result of the above query
> PhoenixMapReduceUtil.setInput(job, PMRAnalyzResultWritable.class, "PMRTEXT", selectQuery);
> // Set the target Phoenix table and the columns
> PhoenixMapReduceUtil.setOutput(job, "PMR_ANALYZER", "PMR_TOKEN,PROBLEM_DESCRIPTION,FINDINGS");
> job.setMapperClass(PMRAnalyzerMapper.class);
> job.setReducerClass(PMRAnalyzerReducer.class);
> job.setOutputFormatClass(PhoenixOutputFormat.class);
> job.setMapOutputKeyClass(Text.class);
> job.setMapOutputValueClass(BytesWritable.class);
> job.setOutputKeyClass(NullWritable.class);
> job.setOutputValueClass(PMRAnalyzResultWritable.class);
> TableMapReduceUtil.addDependencyJars(job);
> job.waitForCompletion(true);
> }catch(Exception e){e.printStackTrace();}
> }
> }
>
--
This message was sent by Atlassian JIRA
(v6.3.4#6332)