You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@hadoop.apache.org by Anseh Danesh <an...@gmail.com> on 2013/10/25 07:46:10 UTC
map phase does not read intermediate results with SequenceFileInputFormat
Hi all.
I have a mapreduce program with two jobs. second job's key and value comes
from first job output. but I think the second map does not get the result
from first job. in other words I think my second job did not read the
output of my first job.. what should I do?
here is the code:
public class dewpoint extends Configured implements Tool
{
private static final Logger logger = LoggerFactory.getLogger(dewpoint.class);
static final String KEYSPACE = "weather";
static final String COLUMN_FAMILY = "user";
private static final String OUTPUT_PATH1 = "/tmp/intermediate1";
private static final String OUTPUT_PATH2 = "/tmp/intermediate2";
private static final String OUTPUT_PATH3 = "/tmp/intermediate3";
private static final String INPUT_PATH1 = "/tmp/intermediate1";
public static void main(String[] args) throws Exception
{
ToolRunner.run(new Configuration(), new dewpoint(), args);
System.exit(0);
}
///////////////////////////////////////////////////////////
public static class dpmap1 extends Mapper<Map<String, ByteBuffer>,
Map<FloatWritable, ByteBuffer>, Text, DoubleWritable>
{
DoubleWritable val1 = new DoubleWritable();
Text word = new Text();
String date;
float temp;
public void map(Map<String, ByteBuffer> keys, Map<FloatWritable,
ByteBuffer> columns, Context context) throws IOException,
InterruptedException
{
for (Entry<String, ByteBuffer> key : keys.entrySet())
{
//System.out.println(key.getKey());
if (!"date".equals(key.getKey()))
continue;
date = ByteBufferUtil.string(key.getValue());
word.set(date);
}
for (Entry<FloatWritable, ByteBuffer> column : columns.entrySet())
{
if (!"temprature".equals(column.getKey()))
continue;
temp = ByteBufferUtil.toFloat(column.getValue());
val1.set(temp);
//System.out.println(temp);
}
context.write(word, val1);
}
}
///////////////////////////////////////////////////////////
public static class dpred1 extends Reducer<Text, DoubleWritable, Text,
DoubleWritable>
{
public void reduce(Text key, Iterable<DoubleWritable> values,
Context context) throws IOException, InterruptedException
{
double beta = 17.62;
double landa = 243.12;
DoubleWritable result1 = new DoubleWritable();
DoubleWritable result2 = new DoubleWritable();
for (DoubleWritable val : values){
// System.out.println(val.get());
beta *= val.get();
landa+=val.get();
}
result1.set(beta);
result2.set(landa);
context.write(key, result1);
context.write(key, result2);
}
}
///////////////////////////////////////////////////////////
public static class dpmap2 extends Mapper <Text, DoubleWritable, Text,
DoubleWritable>{
Text key2 = new Text();
double temp1, temp2 =0;
public void map(Text key, Iterable<DoubleWritable> values, Context
context) throws IOException, InterruptedException {
String[] sp = values.toString().split("\t");
for (int i=0; i< sp.length; i+=4)
//key2.set(sp[i]);
System.out.println(sp[i]);
for(int j=1;j< sp.length; j+=4)
temp1 = Double.valueOf(sp[j]);
for (int k=3;k< sp.length; k+=4)
temp2 = Double.valueOf(sp[k]);
context.write(key2, new DoubleWritable(temp2/temp1));
}
}
///////////////////////////////////////////////////////////
public static class dpred2 extends Reducer<Text, DoubleWritable, Text,
DoubleWritable>
{
public void reduce(Text key, Iterable<DoubleWritable> values,
Context context) throws IOException, InterruptedException
{
double alpha = 6.112;
double tmp = 0;
DoubleWritable result3 = new DoubleWritable();
for (DoubleWritable val : values){
System.out.println(val.get());
tmp = alpha*(Math.pow(Math.E, val.get()));
}
result3.set(tmp);
context.write(key, result3);
}
}
///////////////////////////////////////////////////////////
public int run(String[] args) throws Exception
{
Job job1 = new Job(getConf(), "DewPoint");
job1.setJarByClass(dewpoint.class);
job1.setMapperClass(dpmap1.class);
job1.setOutputFormatClass(SequenceFileOutputFormat.class);
job1.setCombinerClass(dpred1.class);
job1.setReducerClass(dpred1.class);
job1.setMapOutputKeyClass(Text.class);
job1.setMapOutputValueClass(DoubleWritable.class);
job1.setOutputKeyClass(Text.class);
job1.setOutputValueClass(DoubleWritable.class);
FileOutputFormat.setOutputPath(job1, new Path(OUTPUT_PATH1));
job1.setInputFormatClass(CqlPagingInputFormat.class);
ConfigHelper.setInputRpcPort(job1.getConfiguration(), "9160");
ConfigHelper.setInputInitialAddress(job1.getConfiguration(), "localhost");
ConfigHelper.setInputColumnFamily(job1.getConfiguration(),
KEYSPACE, COLUMN_FAMILY);
ConfigHelper.setInputPartitioner(job1.getConfiguration(),
"Murmur3Partitioner");
CqlConfigHelper.setInputCQLPageRowSize(job1.getConfiguration(), "3");
job1.waitForCompletion(true);
/***************************************/
if (job1.isSuccessful()){
Job job2 = new Job(getConf(), "DewPoint");
job2.setJarByClass(dewpoint.class);
job2.setMapperClass(dpmap2.class);
job2.setCombinerClass(dpred2.class);
job2.setReducerClass(dpred2.class);
job2.setMapOutputKeyClass(Text.class);
job2.setMapOutputValueClass(DoubleWritable.class);
job2.setOutputKeyClass(Text.class);
job2.setOutputValueClass(DoubleWritable.class);
job2.setOutputFormatClass(TextOutputFormat.class);
job2.setInputFormatClass(SequenceFileInputFormat.class);
FileInputFormat.addInputPath(job2, new Path(OUTPUT_PATH1));
FileOutputFormat.setOutputPath(job2, new Path(OUTPUT_PATH2));
job2.waitForCompletion(true);
}
///////////////////////////////////////////////////
return 0;
}
}
for example in my second map phase when I do a System.out.println(key)
it does not print any thing and in reduce result the value is
'infinity'....
Re: map phase does not read intermediate results with SequenceFileInputFormat
Posted by Anseh Danesh <an...@gmail.com>.
yes.. thanks for the reply..
On Fri, Oct 25, 2013 at 1:46 PM, Dieter De Witte <dr...@gmail.com> wrote:
> The question is also on stackoverflow, the problem is that she divides by
> zero in the second mapper I think. (the logs show that both jobs have a
> data flow..
>
>
> 2013/10/25 Robin East <ro...@xense.co.uk>
>
>> Hi
>>
>> Are you sure job1 created output where you expected it and in the format
>> you expect? Have you tested job2 on its own with some hand-crafted test
>> data? Is the output from job1 consistent with your hand-crafted test data
>> for job2?
>>
>> regards
>> On 25 Oct 2013, at 06:46, Anseh Danesh <an...@gmail.com> wrote:
>>
>> Hi all.
>>
>> I have a mapreduce program with two jobs. second job's key and value
>> comes from first job output. but I think the second map does not get the
>> result from first job. in other words I think my second job did not read
>> the output of my first job.. what should I do?
>>
>> here is the code:
>>
>> public class dewpoint extends Configured implements Tool
>> {
>> private static final Logger logger = LoggerFactory.getLogger(dewpoint.class);
>>
>> static final String KEYSPACE = "weather";
>> static final String COLUMN_FAMILY = "user";
>> private static final String OUTPUT_PATH1 = "/tmp/intermediate1";
>> private static final String OUTPUT_PATH2 = "/tmp/intermediate2";
>> private static final String OUTPUT_PATH3 = "/tmp/intermediate3";
>> private static final String INPUT_PATH1 = "/tmp/intermediate1";
>>
>> public static void main(String[] args) throws Exception
>> {
>>
>> ToolRunner.run(new Configuration(), new dewpoint(), args);
>> System.exit(0);
>> }
>>
>> ///////////////////////////////////////////////////////////
>>
>> public static class dpmap1 extends Mapper<Map<String, ByteBuffer>, Map<FloatWritable, ByteBuffer>, Text, DoubleWritable>
>> {
>> DoubleWritable val1 = new DoubleWritable();
>> Text word = new Text();
>> String date;
>> float temp;
>> public void map(Map<String, ByteBuffer> keys, Map<FloatWritable, ByteBuffer> columns, Context context) throws IOException, InterruptedException
>> {
>>
>> for (Entry<String, ByteBuffer> key : keys.entrySet())
>> {
>> //System.out.println(key.getKey());
>> if (!"date".equals(key.getKey()))
>> continue;
>> date = ByteBufferUtil.string(key.getValue());
>> word.set(date);
>> }
>>
>>
>> for (Entry<FloatWritable, ByteBuffer> column : columns.entrySet())
>> {
>> if (!"temprature".equals(column.getKey()))
>> continue;
>> temp = ByteBufferUtil.toFloat(column.getValue());
>> val1.set(temp);
>> //System.out.println(temp);
>> }
>> context.write(word, val1);
>> }
>> }
>>
>> ///////////////////////////////////////////////////////////
>>
>> public static class dpred1 extends Reducer<Text, DoubleWritable, Text, DoubleWritable>
>> {
>> public void reduce(Text key, Iterable<DoubleWritable> values, Context context) throws IOException, InterruptedException
>> {
>> double beta = 17.62;
>> double landa = 243.12;
>> DoubleWritable result1 = new DoubleWritable();
>> DoubleWritable result2 = new DoubleWritable();
>> for (DoubleWritable val : values){
>> // System.out.println(val.get());
>> beta *= val.get();
>> landa+=val.get();
>> }
>> result1.set(beta);
>> result2.set(landa);
>>
>> context.write(key, result1);
>> context.write(key, result2);
>> }
>> }
>> ///////////////////////////////////////////////////////////
>>
>> public static class dpmap2 extends Mapper <Text, DoubleWritable, Text, DoubleWritable>{
>>
>> Text key2 = new Text();
>> double temp1, temp2 =0;
>>
>> public void map(Text key, Iterable<DoubleWritable> values, Context context) throws IOException, InterruptedException {
>> String[] sp = values.toString().split("\t");
>> for (int i=0; i< sp.length; i+=4)
>> //key2.set(sp[i]);
>> System.out.println(sp[i]);
>> for(int j=1;j< sp.length; j+=4)
>> temp1 = Double.valueOf(sp[j]);
>> for (int k=3;k< sp.length; k+=4)
>> temp2 = Double.valueOf(sp[k]);
>> context.write(key2, new DoubleWritable(temp2/temp1));
>>
>> }
>> }
>>
>> ///////////////////////////////////////////////////////////
>>
>>
>> public static class dpred2 extends Reducer<Text, DoubleWritable, Text, DoubleWritable>
>> {
>> public void reduce(Text key, Iterable<DoubleWritable> values, Context context) throws IOException, InterruptedException
>> {
>>
>> double alpha = 6.112;
>> double tmp = 0;
>> DoubleWritable result3 = new DoubleWritable();
>> for (DoubleWritable val : values){
>> System.out.println(val.get());
>> tmp = alpha*(Math.pow(Math.E, val.get()));
>>
>> }
>> result3.set(tmp);
>> context.write(key, result3);
>>
>>
>> }
>> }
>>
>>
>> ///////////////////////////////////////////////////////////
>>
>>
>> public int run(String[] args) throws Exception
>> {
>>
>> Job job1 = new Job(getConf(), "DewPoint");
>> job1.setJarByClass(dewpoint.class);
>> job1.setMapperClass(dpmap1.class);
>> job1.setOutputFormatClass(SequenceFileOutputFormat.class);
>> job1.setCombinerClass(dpred1.class);
>> job1.setReducerClass(dpred1.class);
>> job1.setMapOutputKeyClass(Text.class);
>> job1.setMapOutputValueClass(DoubleWritable.class);
>> job1.setOutputKeyClass(Text.class);
>> job1.setOutputValueClass(DoubleWritable.class);
>> FileOutputFormat.setOutputPath(job1, new Path(OUTPUT_PATH1));
>>
>>
>> job1.setInputFormatClass(CqlPagingInputFormat.class);
>>
>> ConfigHelper.setInputRpcPort(job1.getConfiguration(), "9160");
>> ConfigHelper.setInputInitialAddress(job1.getConfiguration(), "localhost");
>> ConfigHelper.setInputColumnFamily(job1.getConfiguration(), KEYSPACE, COLUMN_FAMILY);
>> ConfigHelper.setInputPartitioner(job1.getConfiguration(), "Murmur3Partitioner");
>>
>> CqlConfigHelper.setInputCQLPageRowSize(job1.getConfiguration(), "3");
>> job1.waitForCompletion(true);
>>
>> /***************************************/
>>
>> if (job1.isSuccessful()){
>> Job job2 = new Job(getConf(), "DewPoint");
>> job2.setJarByClass(dewpoint.class);
>> job2.setMapperClass(dpmap2.class);
>> job2.setCombinerClass(dpred2.class);
>> job2.setReducerClass(dpred2.class);
>> job2.setMapOutputKeyClass(Text.class);
>> job2.setMapOutputValueClass(DoubleWritable.class);
>> job2.setOutputKeyClass(Text.class);
>> job2.setOutputValueClass(DoubleWritable.class);
>> job2.setOutputFormatClass(TextOutputFormat.class);
>> job2.setInputFormatClass(SequenceFileInputFormat.class);
>> FileInputFormat.addInputPath(job2, new Path(OUTPUT_PATH1));
>> FileOutputFormat.setOutputPath(job2, new Path(OUTPUT_PATH2));
>> job2.waitForCompletion(true);
>> }
>> ///////////////////////////////////////////////////
>>
>> return 0;
>> }
>> }
>>
>> for example in my second map phase when I do a System.out.println(key) it does not print any thing and in reduce result the value is 'infinity'....
>>
>>
>>
>
Re: map phase does not read intermediate results with SequenceFileInputFormat
Posted by Anseh Danesh <an...@gmail.com>.
yes.. thanks for the reply..
On Fri, Oct 25, 2013 at 1:46 PM, Dieter De Witte <dr...@gmail.com> wrote:
> The question is also on stackoverflow, the problem is that she divides by
> zero in the second mapper I think. (the logs show that both jobs have a
> data flow..
>
>
> 2013/10/25 Robin East <ro...@xense.co.uk>
>
>> Hi
>>
>> Are you sure job1 created output where you expected it and in the format
>> you expect? Have you tested job2 on its own with some hand-crafted test
>> data? Is the output from job1 consistent with your hand-crafted test data
>> for job2?
>>
>> regards
>> On 25 Oct 2013, at 06:46, Anseh Danesh <an...@gmail.com> wrote:
>>
>> Hi all.
>>
>> I have a mapreduce program with two jobs. second job's key and value
>> comes from first job output. but I think the second map does not get the
>> result from first job. in other words I think my second job did not read
>> the output of my first job.. what should I do?
>>
>> here is the code:
>>
>> public class dewpoint extends Configured implements Tool
>> {
>> private static final Logger logger = LoggerFactory.getLogger(dewpoint.class);
>>
>> static final String KEYSPACE = "weather";
>> static final String COLUMN_FAMILY = "user";
>> private static final String OUTPUT_PATH1 = "/tmp/intermediate1";
>> private static final String OUTPUT_PATH2 = "/tmp/intermediate2";
>> private static final String OUTPUT_PATH3 = "/tmp/intermediate3";
>> private static final String INPUT_PATH1 = "/tmp/intermediate1";
>>
>> public static void main(String[] args) throws Exception
>> {
>>
>> ToolRunner.run(new Configuration(), new dewpoint(), args);
>> System.exit(0);
>> }
>>
>> ///////////////////////////////////////////////////////////
>>
>> public static class dpmap1 extends Mapper<Map<String, ByteBuffer>, Map<FloatWritable, ByteBuffer>, Text, DoubleWritable>
>> {
>> DoubleWritable val1 = new DoubleWritable();
>> Text word = new Text();
>> String date;
>> float temp;
>> public void map(Map<String, ByteBuffer> keys, Map<FloatWritable, ByteBuffer> columns, Context context) throws IOException, InterruptedException
>> {
>>
>> for (Entry<String, ByteBuffer> key : keys.entrySet())
>> {
>> //System.out.println(key.getKey());
>> if (!"date".equals(key.getKey()))
>> continue;
>> date = ByteBufferUtil.string(key.getValue());
>> word.set(date);
>> }
>>
>>
>> for (Entry<FloatWritable, ByteBuffer> column : columns.entrySet())
>> {
>> if (!"temprature".equals(column.getKey()))
>> continue;
>> temp = ByteBufferUtil.toFloat(column.getValue());
>> val1.set(temp);
>> //System.out.println(temp);
>> }
>> context.write(word, val1);
>> }
>> }
>>
>> ///////////////////////////////////////////////////////////
>>
>> public static class dpred1 extends Reducer<Text, DoubleWritable, Text, DoubleWritable>
>> {
>> public void reduce(Text key, Iterable<DoubleWritable> values, Context context) throws IOException, InterruptedException
>> {
>> double beta = 17.62;
>> double landa = 243.12;
>> DoubleWritable result1 = new DoubleWritable();
>> DoubleWritable result2 = new DoubleWritable();
>> for (DoubleWritable val : values){
>> // System.out.println(val.get());
>> beta *= val.get();
>> landa+=val.get();
>> }
>> result1.set(beta);
>> result2.set(landa);
>>
>> context.write(key, result1);
>> context.write(key, result2);
>> }
>> }
>> ///////////////////////////////////////////////////////////
>>
>> public static class dpmap2 extends Mapper <Text, DoubleWritable, Text, DoubleWritable>{
>>
>> Text key2 = new Text();
>> double temp1, temp2 =0;
>>
>> public void map(Text key, Iterable<DoubleWritable> values, Context context) throws IOException, InterruptedException {
>> String[] sp = values.toString().split("\t");
>> for (int i=0; i< sp.length; i+=4)
>> //key2.set(sp[i]);
>> System.out.println(sp[i]);
>> for(int j=1;j< sp.length; j+=4)
>> temp1 = Double.valueOf(sp[j]);
>> for (int k=3;k< sp.length; k+=4)
>> temp2 = Double.valueOf(sp[k]);
>> context.write(key2, new DoubleWritable(temp2/temp1));
>>
>> }
>> }
>>
>> ///////////////////////////////////////////////////////////
>>
>>
>> public static class dpred2 extends Reducer<Text, DoubleWritable, Text, DoubleWritable>
>> {
>> public void reduce(Text key, Iterable<DoubleWritable> values, Context context) throws IOException, InterruptedException
>> {
>>
>> double alpha = 6.112;
>> double tmp = 0;
>> DoubleWritable result3 = new DoubleWritable();
>> for (DoubleWritable val : values){
>> System.out.println(val.get());
>> tmp = alpha*(Math.pow(Math.E, val.get()));
>>
>> }
>> result3.set(tmp);
>> context.write(key, result3);
>>
>>
>> }
>> }
>>
>>
>> ///////////////////////////////////////////////////////////
>>
>>
>> public int run(String[] args) throws Exception
>> {
>>
>> Job job1 = new Job(getConf(), "DewPoint");
>> job1.setJarByClass(dewpoint.class);
>> job1.setMapperClass(dpmap1.class);
>> job1.setOutputFormatClass(SequenceFileOutputFormat.class);
>> job1.setCombinerClass(dpred1.class);
>> job1.setReducerClass(dpred1.class);
>> job1.setMapOutputKeyClass(Text.class);
>> job1.setMapOutputValueClass(DoubleWritable.class);
>> job1.setOutputKeyClass(Text.class);
>> job1.setOutputValueClass(DoubleWritable.class);
>> FileOutputFormat.setOutputPath(job1, new Path(OUTPUT_PATH1));
>>
>>
>> job1.setInputFormatClass(CqlPagingInputFormat.class);
>>
>> ConfigHelper.setInputRpcPort(job1.getConfiguration(), "9160");
>> ConfigHelper.setInputInitialAddress(job1.getConfiguration(), "localhost");
>> ConfigHelper.setInputColumnFamily(job1.getConfiguration(), KEYSPACE, COLUMN_FAMILY);
>> ConfigHelper.setInputPartitioner(job1.getConfiguration(), "Murmur3Partitioner");
>>
>> CqlConfigHelper.setInputCQLPageRowSize(job1.getConfiguration(), "3");
>> job1.waitForCompletion(true);
>>
>> /***************************************/
>>
>> if (job1.isSuccessful()){
>> Job job2 = new Job(getConf(), "DewPoint");
>> job2.setJarByClass(dewpoint.class);
>> job2.setMapperClass(dpmap2.class);
>> job2.setCombinerClass(dpred2.class);
>> job2.setReducerClass(dpred2.class);
>> job2.setMapOutputKeyClass(Text.class);
>> job2.setMapOutputValueClass(DoubleWritable.class);
>> job2.setOutputKeyClass(Text.class);
>> job2.setOutputValueClass(DoubleWritable.class);
>> job2.setOutputFormatClass(TextOutputFormat.class);
>> job2.setInputFormatClass(SequenceFileInputFormat.class);
>> FileInputFormat.addInputPath(job2, new Path(OUTPUT_PATH1));
>> FileOutputFormat.setOutputPath(job2, new Path(OUTPUT_PATH2));
>> job2.waitForCompletion(true);
>> }
>> ///////////////////////////////////////////////////
>>
>> return 0;
>> }
>> }
>>
>> for example in my second map phase when I do a System.out.println(key) it does not print any thing and in reduce result the value is 'infinity'....
>>
>>
>>
>
Re: map phase does not read intermediate results with SequenceFileInputFormat
Posted by Anseh Danesh <an...@gmail.com>.
yes.. thanks for the reply..
On Fri, Oct 25, 2013 at 1:46 PM, Dieter De Witte <dr...@gmail.com> wrote:
> The question is also on stackoverflow, the problem is that she divides by
> zero in the second mapper I think. (the logs show that both jobs have a
> data flow..
>
>
> 2013/10/25 Robin East <ro...@xense.co.uk>
>
>> Hi
>>
>> Are you sure job1 created output where you expected it and in the format
>> you expect? Have you tested job2 on its own with some hand-crafted test
>> data? Is the output from job1 consistent with your hand-crafted test data
>> for job2?
>>
>> regards
>> On 25 Oct 2013, at 06:46, Anseh Danesh <an...@gmail.com> wrote:
>>
>> Hi all.
>>
>> I have a mapreduce program with two jobs. second job's key and value
>> comes from first job output. but I think the second map does not get the
>> result from first job. in other words I think my second job did not read
>> the output of my first job.. what should I do?
>>
>> here is the code:
>>
>> public class dewpoint extends Configured implements Tool
>> {
>> private static final Logger logger = LoggerFactory.getLogger(dewpoint.class);
>>
>> static final String KEYSPACE = "weather";
>> static final String COLUMN_FAMILY = "user";
>> private static final String OUTPUT_PATH1 = "/tmp/intermediate1";
>> private static final String OUTPUT_PATH2 = "/tmp/intermediate2";
>> private static final String OUTPUT_PATH3 = "/tmp/intermediate3";
>> private static final String INPUT_PATH1 = "/tmp/intermediate1";
>>
>> public static void main(String[] args) throws Exception
>> {
>>
>> ToolRunner.run(new Configuration(), new dewpoint(), args);
>> System.exit(0);
>> }
>>
>> ///////////////////////////////////////////////////////////
>>
>> public static class dpmap1 extends Mapper<Map<String, ByteBuffer>, Map<FloatWritable, ByteBuffer>, Text, DoubleWritable>
>> {
>> DoubleWritable val1 = new DoubleWritable();
>> Text word = new Text();
>> String date;
>> float temp;
>> public void map(Map<String, ByteBuffer> keys, Map<FloatWritable, ByteBuffer> columns, Context context) throws IOException, InterruptedException
>> {
>>
>> for (Entry<String, ByteBuffer> key : keys.entrySet())
>> {
>> //System.out.println(key.getKey());
>> if (!"date".equals(key.getKey()))
>> continue;
>> date = ByteBufferUtil.string(key.getValue());
>> word.set(date);
>> }
>>
>>
>> for (Entry<FloatWritable, ByteBuffer> column : columns.entrySet())
>> {
>> if (!"temprature".equals(column.getKey()))
>> continue;
>> temp = ByteBufferUtil.toFloat(column.getValue());
>> val1.set(temp);
>> //System.out.println(temp);
>> }
>> context.write(word, val1);
>> }
>> }
>>
>> ///////////////////////////////////////////////////////////
>>
>> public static class dpred1 extends Reducer<Text, DoubleWritable, Text, DoubleWritable>
>> {
>> public void reduce(Text key, Iterable<DoubleWritable> values, Context context) throws IOException, InterruptedException
>> {
>> double beta = 17.62;
>> double landa = 243.12;
>> DoubleWritable result1 = new DoubleWritable();
>> DoubleWritable result2 = new DoubleWritable();
>> for (DoubleWritable val : values){
>> // System.out.println(val.get());
>> beta *= val.get();
>> landa+=val.get();
>> }
>> result1.set(beta);
>> result2.set(landa);
>>
>> context.write(key, result1);
>> context.write(key, result2);
>> }
>> }
>> ///////////////////////////////////////////////////////////
>>
>> public static class dpmap2 extends Mapper <Text, DoubleWritable, Text, DoubleWritable>{
>>
>> Text key2 = new Text();
>> double temp1, temp2 =0;
>>
>> public void map(Text key, Iterable<DoubleWritable> values, Context context) throws IOException, InterruptedException {
>> String[] sp = values.toString().split("\t");
>> for (int i=0; i< sp.length; i+=4)
>> //key2.set(sp[i]);
>> System.out.println(sp[i]);
>> for(int j=1;j< sp.length; j+=4)
>> temp1 = Double.valueOf(sp[j]);
>> for (int k=3;k< sp.length; k+=4)
>> temp2 = Double.valueOf(sp[k]);
>> context.write(key2, new DoubleWritable(temp2/temp1));
>>
>> }
>> }
>>
>> ///////////////////////////////////////////////////////////
>>
>>
>> public static class dpred2 extends Reducer<Text, DoubleWritable, Text, DoubleWritable>
>> {
>> public void reduce(Text key, Iterable<DoubleWritable> values, Context context) throws IOException, InterruptedException
>> {
>>
>> double alpha = 6.112;
>> double tmp = 0;
>> DoubleWritable result3 = new DoubleWritable();
>> for (DoubleWritable val : values){
>> System.out.println(val.get());
>> tmp = alpha*(Math.pow(Math.E, val.get()));
>>
>> }
>> result3.set(tmp);
>> context.write(key, result3);
>>
>>
>> }
>> }
>>
>>
>> ///////////////////////////////////////////////////////////
>>
>>
>> public int run(String[] args) throws Exception
>> {
>>
>> Job job1 = new Job(getConf(), "DewPoint");
>> job1.setJarByClass(dewpoint.class);
>> job1.setMapperClass(dpmap1.class);
>> job1.setOutputFormatClass(SequenceFileOutputFormat.class);
>> job1.setCombinerClass(dpred1.class);
>> job1.setReducerClass(dpred1.class);
>> job1.setMapOutputKeyClass(Text.class);
>> job1.setMapOutputValueClass(DoubleWritable.class);
>> job1.setOutputKeyClass(Text.class);
>> job1.setOutputValueClass(DoubleWritable.class);
>> FileOutputFormat.setOutputPath(job1, new Path(OUTPUT_PATH1));
>>
>>
>> job1.setInputFormatClass(CqlPagingInputFormat.class);
>>
>> ConfigHelper.setInputRpcPort(job1.getConfiguration(), "9160");
>> ConfigHelper.setInputInitialAddress(job1.getConfiguration(), "localhost");
>> ConfigHelper.setInputColumnFamily(job1.getConfiguration(), KEYSPACE, COLUMN_FAMILY);
>> ConfigHelper.setInputPartitioner(job1.getConfiguration(), "Murmur3Partitioner");
>>
>> CqlConfigHelper.setInputCQLPageRowSize(job1.getConfiguration(), "3");
>> job1.waitForCompletion(true);
>>
>> /***************************************/
>>
>> if (job1.isSuccessful()){
>> Job job2 = new Job(getConf(), "DewPoint");
>> job2.setJarByClass(dewpoint.class);
>> job2.setMapperClass(dpmap2.class);
>> job2.setCombinerClass(dpred2.class);
>> job2.setReducerClass(dpred2.class);
>> job2.setMapOutputKeyClass(Text.class);
>> job2.setMapOutputValueClass(DoubleWritable.class);
>> job2.setOutputKeyClass(Text.class);
>> job2.setOutputValueClass(DoubleWritable.class);
>> job2.setOutputFormatClass(TextOutputFormat.class);
>> job2.setInputFormatClass(SequenceFileInputFormat.class);
>> FileInputFormat.addInputPath(job2, new Path(OUTPUT_PATH1));
>> FileOutputFormat.setOutputPath(job2, new Path(OUTPUT_PATH2));
>> job2.waitForCompletion(true);
>> }
>> ///////////////////////////////////////////////////
>>
>> return 0;
>> }
>> }
>>
>> for example in my second map phase when I do a System.out.println(key) it does not print any thing and in reduce result the value is 'infinity'....
>>
>>
>>
>
Re: map phase does not read intermediate results with SequenceFileInputFormat
Posted by Anseh Danesh <an...@gmail.com>.
yes.. thanks for the reply..
On Fri, Oct 25, 2013 at 1:46 PM, Dieter De Witte <dr...@gmail.com> wrote:
> The question is also on stackoverflow, the problem is that she divides by
> zero in the second mapper I think. (the logs show that both jobs have a
> data flow..
>
>
> 2013/10/25 Robin East <ro...@xense.co.uk>
>
>> Hi
>>
>> Are you sure job1 created output where you expected it and in the format
>> you expect? Have you tested job2 on its own with some hand-crafted test
>> data? Is the output from job1 consistent with your hand-crafted test data
>> for job2?
>>
>> regards
>> On 25 Oct 2013, at 06:46, Anseh Danesh <an...@gmail.com> wrote:
>>
>> Hi all.
>>
>> I have a mapreduce program with two jobs. second job's key and value
>> comes from first job output. but I think the second map does not get the
>> result from first job. in other words I think my second job did not read
>> the output of my first job.. what should I do?
>>
>> here is the code:
>>
>> public class dewpoint extends Configured implements Tool
>> {
>> private static final Logger logger = LoggerFactory.getLogger(dewpoint.class);
>>
>> static final String KEYSPACE = "weather";
>> static final String COLUMN_FAMILY = "user";
>> private static final String OUTPUT_PATH1 = "/tmp/intermediate1";
>> private static final String OUTPUT_PATH2 = "/tmp/intermediate2";
>> private static final String OUTPUT_PATH3 = "/tmp/intermediate3";
>> private static final String INPUT_PATH1 = "/tmp/intermediate1";
>>
>> public static void main(String[] args) throws Exception
>> {
>>
>> ToolRunner.run(new Configuration(), new dewpoint(), args);
>> System.exit(0);
>> }
>>
>> ///////////////////////////////////////////////////////////
>>
>> public static class dpmap1 extends Mapper<Map<String, ByteBuffer>, Map<FloatWritable, ByteBuffer>, Text, DoubleWritable>
>> {
>> DoubleWritable val1 = new DoubleWritable();
>> Text word = new Text();
>> String date;
>> float temp;
>> public void map(Map<String, ByteBuffer> keys, Map<FloatWritable, ByteBuffer> columns, Context context) throws IOException, InterruptedException
>> {
>>
>> for (Entry<String, ByteBuffer> key : keys.entrySet())
>> {
>> //System.out.println(key.getKey());
>> if (!"date".equals(key.getKey()))
>> continue;
>> date = ByteBufferUtil.string(key.getValue());
>> word.set(date);
>> }
>>
>>
>> for (Entry<FloatWritable, ByteBuffer> column : columns.entrySet())
>> {
>> if (!"temprature".equals(column.getKey()))
>> continue;
>> temp = ByteBufferUtil.toFloat(column.getValue());
>> val1.set(temp);
>> //System.out.println(temp);
>> }
>> context.write(word, val1);
>> }
>> }
>>
>> ///////////////////////////////////////////////////////////
>>
>> public static class dpred1 extends Reducer<Text, DoubleWritable, Text, DoubleWritable>
>> {
>> public void reduce(Text key, Iterable<DoubleWritable> values, Context context) throws IOException, InterruptedException
>> {
>> double beta = 17.62;
>> double landa = 243.12;
>> DoubleWritable result1 = new DoubleWritable();
>> DoubleWritable result2 = new DoubleWritable();
>> for (DoubleWritable val : values){
>> // System.out.println(val.get());
>> beta *= val.get();
>> landa+=val.get();
>> }
>> result1.set(beta);
>> result2.set(landa);
>>
>> context.write(key, result1);
>> context.write(key, result2);
>> }
>> }
>> ///////////////////////////////////////////////////////////
>>
>> public static class dpmap2 extends Mapper <Text, DoubleWritable, Text, DoubleWritable>{
>>
>> Text key2 = new Text();
>> double temp1, temp2 =0;
>>
>> public void map(Text key, Iterable<DoubleWritable> values, Context context) throws IOException, InterruptedException {
>> String[] sp = values.toString().split("\t");
>> for (int i=0; i< sp.length; i+=4)
>> //key2.set(sp[i]);
>> System.out.println(sp[i]);
>> for(int j=1;j< sp.length; j+=4)
>> temp1 = Double.valueOf(sp[j]);
>> for (int k=3;k< sp.length; k+=4)
>> temp2 = Double.valueOf(sp[k]);
>> context.write(key2, new DoubleWritable(temp2/temp1));
>>
>> }
>> }
>>
>> ///////////////////////////////////////////////////////////
>>
>>
>> public static class dpred2 extends Reducer<Text, DoubleWritable, Text, DoubleWritable>
>> {
>> public void reduce(Text key, Iterable<DoubleWritable> values, Context context) throws IOException, InterruptedException
>> {
>>
>> double alpha = 6.112;
>> double tmp = 0;
>> DoubleWritable result3 = new DoubleWritable();
>> for (DoubleWritable val : values){
>> System.out.println(val.get());
>> tmp = alpha*(Math.pow(Math.E, val.get()));
>>
>> }
>> result3.set(tmp);
>> context.write(key, result3);
>>
>>
>> }
>> }
>>
>>
>> ///////////////////////////////////////////////////////////
>>
>>
>> public int run(String[] args) throws Exception
>> {
>>
>> Job job1 = new Job(getConf(), "DewPoint");
>> job1.setJarByClass(dewpoint.class);
>> job1.setMapperClass(dpmap1.class);
>> job1.setOutputFormatClass(SequenceFileOutputFormat.class);
>> job1.setCombinerClass(dpred1.class);
>> job1.setReducerClass(dpred1.class);
>> job1.setMapOutputKeyClass(Text.class);
>> job1.setMapOutputValueClass(DoubleWritable.class);
>> job1.setOutputKeyClass(Text.class);
>> job1.setOutputValueClass(DoubleWritable.class);
>> FileOutputFormat.setOutputPath(job1, new Path(OUTPUT_PATH1));
>>
>>
>> job1.setInputFormatClass(CqlPagingInputFormat.class);
>>
>> ConfigHelper.setInputRpcPort(job1.getConfiguration(), "9160");
>> ConfigHelper.setInputInitialAddress(job1.getConfiguration(), "localhost");
>> ConfigHelper.setInputColumnFamily(job1.getConfiguration(), KEYSPACE, COLUMN_FAMILY);
>> ConfigHelper.setInputPartitioner(job1.getConfiguration(), "Murmur3Partitioner");
>>
>> CqlConfigHelper.setInputCQLPageRowSize(job1.getConfiguration(), "3");
>> job1.waitForCompletion(true);
>>
>> /***************************************/
>>
>> if (job1.isSuccessful()){
>> Job job2 = new Job(getConf(), "DewPoint");
>> job2.setJarByClass(dewpoint.class);
>> job2.setMapperClass(dpmap2.class);
>> job2.setCombinerClass(dpred2.class);
>> job2.setReducerClass(dpred2.class);
>> job2.setMapOutputKeyClass(Text.class);
>> job2.setMapOutputValueClass(DoubleWritable.class);
>> job2.setOutputKeyClass(Text.class);
>> job2.setOutputValueClass(DoubleWritable.class);
>> job2.setOutputFormatClass(TextOutputFormat.class);
>> job2.setInputFormatClass(SequenceFileInputFormat.class);
>> FileInputFormat.addInputPath(job2, new Path(OUTPUT_PATH1));
>> FileOutputFormat.setOutputPath(job2, new Path(OUTPUT_PATH2));
>> job2.waitForCompletion(true);
>> }
>> ///////////////////////////////////////////////////
>>
>> return 0;
>> }
>> }
>>
>> for example in my second map phase when I do a System.out.println(key) it does not print any thing and in reduce result the value is 'infinity'....
>>
>>
>>
>
Re: map phase does not read intermediate results with SequenceFileInputFormat
Posted by Dieter De Witte <dr...@gmail.com>.
The question is also on stackoverflow, the problem is that she divides by
zero in the second mapper I think. (the logs show that both jobs have a
data flow..
2013/10/25 Robin East <ro...@xense.co.uk>
> Hi
>
> Are you sure job1 created output where you expected it and in the format
> you expect? Have you tested job2 on its own with some hand-crafted test
> data? Is the output from job1 consistent with your hand-crafted test data
> for job2?
>
> regards
> On 25 Oct 2013, at 06:46, Anseh Danesh <an...@gmail.com> wrote:
>
> Hi all.
>
> I have a mapreduce program with two jobs. second job's key and value comes
> from first job output. but I think the second map does not get the result
> from first job. in other words I think my second job did not read the
> output of my first job.. what should I do?
>
> here is the code:
>
> public class dewpoint extends Configured implements Tool
> {
> private static final Logger logger = LoggerFactory.getLogger(dewpoint.class);
>
> static final String KEYSPACE = "weather";
> static final String COLUMN_FAMILY = "user";
> private static final String OUTPUT_PATH1 = "/tmp/intermediate1";
> private static final String OUTPUT_PATH2 = "/tmp/intermediate2";
> private static final String OUTPUT_PATH3 = "/tmp/intermediate3";
> private static final String INPUT_PATH1 = "/tmp/intermediate1";
>
> public static void main(String[] args) throws Exception
> {
>
> ToolRunner.run(new Configuration(), new dewpoint(), args);
> System.exit(0);
> }
>
> ///////////////////////////////////////////////////////////
>
> public static class dpmap1 extends Mapper<Map<String, ByteBuffer>, Map<FloatWritable, ByteBuffer>, Text, DoubleWritable>
> {
> DoubleWritable val1 = new DoubleWritable();
> Text word = new Text();
> String date;
> float temp;
> public void map(Map<String, ByteBuffer> keys, Map<FloatWritable, ByteBuffer> columns, Context context) throws IOException, InterruptedException
> {
>
> for (Entry<String, ByteBuffer> key : keys.entrySet())
> {
> //System.out.println(key.getKey());
> if (!"date".equals(key.getKey()))
> continue;
> date = ByteBufferUtil.string(key.getValue());
> word.set(date);
> }
>
>
> for (Entry<FloatWritable, ByteBuffer> column : columns.entrySet())
> {
> if (!"temprature".equals(column.getKey()))
> continue;
> temp = ByteBufferUtil.toFloat(column.getValue());
> val1.set(temp);
> //System.out.println(temp);
> }
> context.write(word, val1);
> }
> }
>
> ///////////////////////////////////////////////////////////
>
> public static class dpred1 extends Reducer<Text, DoubleWritable, Text, DoubleWritable>
> {
> public void reduce(Text key, Iterable<DoubleWritable> values, Context context) throws IOException, InterruptedException
> {
> double beta = 17.62;
> double landa = 243.12;
> DoubleWritable result1 = new DoubleWritable();
> DoubleWritable result2 = new DoubleWritable();
> for (DoubleWritable val : values){
> // System.out.println(val.get());
> beta *= val.get();
> landa+=val.get();
> }
> result1.set(beta);
> result2.set(landa);
>
> context.write(key, result1);
> context.write(key, result2);
> }
> }
> ///////////////////////////////////////////////////////////
>
> public static class dpmap2 extends Mapper <Text, DoubleWritable, Text, DoubleWritable>{
>
> Text key2 = new Text();
> double temp1, temp2 =0;
>
> public void map(Text key, Iterable<DoubleWritable> values, Context context) throws IOException, InterruptedException {
> String[] sp = values.toString().split("\t");
> for (int i=0; i< sp.length; i+=4)
> //key2.set(sp[i]);
> System.out.println(sp[i]);
> for(int j=1;j< sp.length; j+=4)
> temp1 = Double.valueOf(sp[j]);
> for (int k=3;k< sp.length; k+=4)
> temp2 = Double.valueOf(sp[k]);
> context.write(key2, new DoubleWritable(temp2/temp1));
>
> }
> }
>
> ///////////////////////////////////////////////////////////
>
>
> public static class dpred2 extends Reducer<Text, DoubleWritable, Text, DoubleWritable>
> {
> public void reduce(Text key, Iterable<DoubleWritable> values, Context context) throws IOException, InterruptedException
> {
>
> double alpha = 6.112;
> double tmp = 0;
> DoubleWritable result3 = new DoubleWritable();
> for (DoubleWritable val : values){
> System.out.println(val.get());
> tmp = alpha*(Math.pow(Math.E, val.get()));
>
> }
> result3.set(tmp);
> context.write(key, result3);
>
>
> }
> }
>
>
> ///////////////////////////////////////////////////////////
>
>
> public int run(String[] args) throws Exception
> {
>
> Job job1 = new Job(getConf(), "DewPoint");
> job1.setJarByClass(dewpoint.class);
> job1.setMapperClass(dpmap1.class);
> job1.setOutputFormatClass(SequenceFileOutputFormat.class);
> job1.setCombinerClass(dpred1.class);
> job1.setReducerClass(dpred1.class);
> job1.setMapOutputKeyClass(Text.class);
> job1.setMapOutputValueClass(DoubleWritable.class);
> job1.setOutputKeyClass(Text.class);
> job1.setOutputValueClass(DoubleWritable.class);
> FileOutputFormat.setOutputPath(job1, new Path(OUTPUT_PATH1));
>
>
> job1.setInputFormatClass(CqlPagingInputFormat.class);
>
> ConfigHelper.setInputRpcPort(job1.getConfiguration(), "9160");
> ConfigHelper.setInputInitialAddress(job1.getConfiguration(), "localhost");
> ConfigHelper.setInputColumnFamily(job1.getConfiguration(), KEYSPACE, COLUMN_FAMILY);
> ConfigHelper.setInputPartitioner(job1.getConfiguration(), "Murmur3Partitioner");
>
> CqlConfigHelper.setInputCQLPageRowSize(job1.getConfiguration(), "3");
> job1.waitForCompletion(true);
>
> /***************************************/
>
> if (job1.isSuccessful()){
> Job job2 = new Job(getConf(), "DewPoint");
> job2.setJarByClass(dewpoint.class);
> job2.setMapperClass(dpmap2.class);
> job2.setCombinerClass(dpred2.class);
> job2.setReducerClass(dpred2.class);
> job2.setMapOutputKeyClass(Text.class);
> job2.setMapOutputValueClass(DoubleWritable.class);
> job2.setOutputKeyClass(Text.class);
> job2.setOutputValueClass(DoubleWritable.class);
> job2.setOutputFormatClass(TextOutputFormat.class);
> job2.setInputFormatClass(SequenceFileInputFormat.class);
> FileInputFormat.addInputPath(job2, new Path(OUTPUT_PATH1));
> FileOutputFormat.setOutputPath(job2, new Path(OUTPUT_PATH2));
> job2.waitForCompletion(true);
> }
> ///////////////////////////////////////////////////
>
> return 0;
> }
> }
>
> for example in my second map phase when I do a System.out.println(key) it does not print any thing and in reduce result the value is 'infinity'....
>
>
>
Re: map phase does not read intermediate results with SequenceFileInputFormat
Posted by Dieter De Witte <dr...@gmail.com>.
The question is also on stackoverflow, the problem is that she divides by
zero in the second mapper I think. (the logs show that both jobs have a
data flow..
2013/10/25 Robin East <ro...@xense.co.uk>
> Hi
>
> Are you sure job1 created output where you expected it and in the format
> you expect? Have you tested job2 on its own with some hand-crafted test
> data? Is the output from job1 consistent with your hand-crafted test data
> for job2?
>
> regards
> On 25 Oct 2013, at 06:46, Anseh Danesh <an...@gmail.com> wrote:
>
> Hi all.
>
> I have a mapreduce program with two jobs. second job's key and value comes
> from first job output. but I think the second map does not get the result
> from first job. in other words I think my second job did not read the
> output of my first job.. what should I do?
>
> here is the code:
>
> public class dewpoint extends Configured implements Tool
> {
> private static final Logger logger = LoggerFactory.getLogger(dewpoint.class);
>
> static final String KEYSPACE = "weather";
> static final String COLUMN_FAMILY = "user";
> private static final String OUTPUT_PATH1 = "/tmp/intermediate1";
> private static final String OUTPUT_PATH2 = "/tmp/intermediate2";
> private static final String OUTPUT_PATH3 = "/tmp/intermediate3";
> private static final String INPUT_PATH1 = "/tmp/intermediate1";
>
> public static void main(String[] args) throws Exception
> {
>
> ToolRunner.run(new Configuration(), new dewpoint(), args);
> System.exit(0);
> }
>
> ///////////////////////////////////////////////////////////
>
> public static class dpmap1 extends Mapper<Map<String, ByteBuffer>, Map<FloatWritable, ByteBuffer>, Text, DoubleWritable>
> {
> DoubleWritable val1 = new DoubleWritable();
> Text word = new Text();
> String date;
> float temp;
> public void map(Map<String, ByteBuffer> keys, Map<FloatWritable, ByteBuffer> columns, Context context) throws IOException, InterruptedException
> {
>
> for (Entry<String, ByteBuffer> key : keys.entrySet())
> {
> //System.out.println(key.getKey());
> if (!"date".equals(key.getKey()))
> continue;
> date = ByteBufferUtil.string(key.getValue());
> word.set(date);
> }
>
>
> for (Entry<FloatWritable, ByteBuffer> column : columns.entrySet())
> {
> if (!"temprature".equals(column.getKey()))
> continue;
> temp = ByteBufferUtil.toFloat(column.getValue());
> val1.set(temp);
> //System.out.println(temp);
> }
> context.write(word, val1);
> }
> }
>
> ///////////////////////////////////////////////////////////
>
> public static class dpred1 extends Reducer<Text, DoubleWritable, Text, DoubleWritable>
> {
> public void reduce(Text key, Iterable<DoubleWritable> values, Context context) throws IOException, InterruptedException
> {
> double beta = 17.62;
> double landa = 243.12;
> DoubleWritable result1 = new DoubleWritable();
> DoubleWritable result2 = new DoubleWritable();
> for (DoubleWritable val : values){
> // System.out.println(val.get());
> beta *= val.get();
> landa+=val.get();
> }
> result1.set(beta);
> result2.set(landa);
>
> context.write(key, result1);
> context.write(key, result2);
> }
> }
> ///////////////////////////////////////////////////////////
>
> public static class dpmap2 extends Mapper <Text, DoubleWritable, Text, DoubleWritable>{
>
> Text key2 = new Text();
> double temp1, temp2 =0;
>
> public void map(Text key, Iterable<DoubleWritable> values, Context context) throws IOException, InterruptedException {
> String[] sp = values.toString().split("\t");
> for (int i=0; i< sp.length; i+=4)
> //key2.set(sp[i]);
> System.out.println(sp[i]);
> for(int j=1;j< sp.length; j+=4)
> temp1 = Double.valueOf(sp[j]);
> for (int k=3;k< sp.length; k+=4)
> temp2 = Double.valueOf(sp[k]);
> context.write(key2, new DoubleWritable(temp2/temp1));
>
> }
> }
>
> ///////////////////////////////////////////////////////////
>
>
> public static class dpred2 extends Reducer<Text, DoubleWritable, Text, DoubleWritable>
> {
> public void reduce(Text key, Iterable<DoubleWritable> values, Context context) throws IOException, InterruptedException
> {
>
> double alpha = 6.112;
> double tmp = 0;
> DoubleWritable result3 = new DoubleWritable();
> for (DoubleWritable val : values){
> System.out.println(val.get());
> tmp = alpha*(Math.pow(Math.E, val.get()));
>
> }
> result3.set(tmp);
> context.write(key, result3);
>
>
> }
> }
>
>
> ///////////////////////////////////////////////////////////
>
>
> public int run(String[] args) throws Exception
> {
>
> Job job1 = new Job(getConf(), "DewPoint");
> job1.setJarByClass(dewpoint.class);
> job1.setMapperClass(dpmap1.class);
> job1.setOutputFormatClass(SequenceFileOutputFormat.class);
> job1.setCombinerClass(dpred1.class);
> job1.setReducerClass(dpred1.class);
> job1.setMapOutputKeyClass(Text.class);
> job1.setMapOutputValueClass(DoubleWritable.class);
> job1.setOutputKeyClass(Text.class);
> job1.setOutputValueClass(DoubleWritable.class);
> FileOutputFormat.setOutputPath(job1, new Path(OUTPUT_PATH1));
>
>
> job1.setInputFormatClass(CqlPagingInputFormat.class);
>
> ConfigHelper.setInputRpcPort(job1.getConfiguration(), "9160");
> ConfigHelper.setInputInitialAddress(job1.getConfiguration(), "localhost");
> ConfigHelper.setInputColumnFamily(job1.getConfiguration(), KEYSPACE, COLUMN_FAMILY);
> ConfigHelper.setInputPartitioner(job1.getConfiguration(), "Murmur3Partitioner");
>
> CqlConfigHelper.setInputCQLPageRowSize(job1.getConfiguration(), "3");
> job1.waitForCompletion(true);
>
> /***************************************/
>
> if (job1.isSuccessful()){
> Job job2 = new Job(getConf(), "DewPoint");
> job2.setJarByClass(dewpoint.class);
> job2.setMapperClass(dpmap2.class);
> job2.setCombinerClass(dpred2.class);
> job2.setReducerClass(dpred2.class);
> job2.setMapOutputKeyClass(Text.class);
> job2.setMapOutputValueClass(DoubleWritable.class);
> job2.setOutputKeyClass(Text.class);
> job2.setOutputValueClass(DoubleWritable.class);
> job2.setOutputFormatClass(TextOutputFormat.class);
> job2.setInputFormatClass(SequenceFileInputFormat.class);
> FileInputFormat.addInputPath(job2, new Path(OUTPUT_PATH1));
> FileOutputFormat.setOutputPath(job2, new Path(OUTPUT_PATH2));
> job2.waitForCompletion(true);
> }
> ///////////////////////////////////////////////////
>
> return 0;
> }
> }
>
> for example in my second map phase when I do a System.out.println(key) it does not print any thing and in reduce result the value is 'infinity'....
>
>
>
Re: map phase does not read intermediate results with SequenceFileInputFormat
Posted by Dieter De Witte <dr...@gmail.com>.
The question is also on stackoverflow, the problem is that she divides by
zero in the second mapper I think. (the logs show that both jobs have a
data flow..
2013/10/25 Robin East <ro...@xense.co.uk>
> Hi
>
> Are you sure job1 created output where you expected it and in the format
> you expect? Have you tested job2 on its own with some hand-crafted test
> data? Is the output from job1 consistent with your hand-crafted test data
> for job2?
>
> regards
> On 25 Oct 2013, at 06:46, Anseh Danesh <an...@gmail.com> wrote:
>
> Hi all.
>
> I have a mapreduce program with two jobs. second job's key and value comes
> from first job output. but I think the second map does not get the result
> from first job. in other words I think my second job did not read the
> output of my first job.. what should I do?
>
> here is the code:
>
> public class dewpoint extends Configured implements Tool
> {
> private static final Logger logger = LoggerFactory.getLogger(dewpoint.class);
>
> static final String KEYSPACE = "weather";
> static final String COLUMN_FAMILY = "user";
> private static final String OUTPUT_PATH1 = "/tmp/intermediate1";
> private static final String OUTPUT_PATH2 = "/tmp/intermediate2";
> private static final String OUTPUT_PATH3 = "/tmp/intermediate3";
> private static final String INPUT_PATH1 = "/tmp/intermediate1";
>
> public static void main(String[] args) throws Exception
> {
>
> ToolRunner.run(new Configuration(), new dewpoint(), args);
> System.exit(0);
> }
>
> ///////////////////////////////////////////////////////////
>
> public static class dpmap1 extends Mapper<Map<String, ByteBuffer>, Map<FloatWritable, ByteBuffer>, Text, DoubleWritable>
> {
> DoubleWritable val1 = new DoubleWritable();
> Text word = new Text();
> String date;
> float temp;
> public void map(Map<String, ByteBuffer> keys, Map<FloatWritable, ByteBuffer> columns, Context context) throws IOException, InterruptedException
> {
>
> for (Entry<String, ByteBuffer> key : keys.entrySet())
> {
> //System.out.println(key.getKey());
> if (!"date".equals(key.getKey()))
> continue;
> date = ByteBufferUtil.string(key.getValue());
> word.set(date);
> }
>
>
> for (Entry<FloatWritable, ByteBuffer> column : columns.entrySet())
> {
> if (!"temprature".equals(column.getKey()))
> continue;
> temp = ByteBufferUtil.toFloat(column.getValue());
> val1.set(temp);
> //System.out.println(temp);
> }
> context.write(word, val1);
> }
> }
>
> ///////////////////////////////////////////////////////////
>
> public static class dpred1 extends Reducer<Text, DoubleWritable, Text, DoubleWritable>
> {
> public void reduce(Text key, Iterable<DoubleWritable> values, Context context) throws IOException, InterruptedException
> {
> double beta = 17.62;
> double landa = 243.12;
> DoubleWritable result1 = new DoubleWritable();
> DoubleWritable result2 = new DoubleWritable();
> for (DoubleWritable val : values){
> // System.out.println(val.get());
> beta *= val.get();
> landa+=val.get();
> }
> result1.set(beta);
> result2.set(landa);
>
> context.write(key, result1);
> context.write(key, result2);
> }
> }
> ///////////////////////////////////////////////////////////
>
> public static class dpmap2 extends Mapper <Text, DoubleWritable, Text, DoubleWritable>{
>
> Text key2 = new Text();
> double temp1, temp2 =0;
>
> public void map(Text key, Iterable<DoubleWritable> values, Context context) throws IOException, InterruptedException {
> String[] sp = values.toString().split("\t");
> for (int i=0; i< sp.length; i+=4)
> //key2.set(sp[i]);
> System.out.println(sp[i]);
> for(int j=1;j< sp.length; j+=4)
> temp1 = Double.valueOf(sp[j]);
> for (int k=3;k< sp.length; k+=4)
> temp2 = Double.valueOf(sp[k]);
> context.write(key2, new DoubleWritable(temp2/temp1));
>
> }
> }
>
> ///////////////////////////////////////////////////////////
>
>
> public static class dpred2 extends Reducer<Text, DoubleWritable, Text, DoubleWritable>
> {
> public void reduce(Text key, Iterable<DoubleWritable> values, Context context) throws IOException, InterruptedException
> {
>
> double alpha = 6.112;
> double tmp = 0;
> DoubleWritable result3 = new DoubleWritable();
> for (DoubleWritable val : values){
> System.out.println(val.get());
> tmp = alpha*(Math.pow(Math.E, val.get()));
>
> }
> result3.set(tmp);
> context.write(key, result3);
>
>
> }
> }
>
>
> ///////////////////////////////////////////////////////////
>
>
> public int run(String[] args) throws Exception
> {
>
> Job job1 = new Job(getConf(), "DewPoint");
> job1.setJarByClass(dewpoint.class);
> job1.setMapperClass(dpmap1.class);
> job1.setOutputFormatClass(SequenceFileOutputFormat.class);
> job1.setCombinerClass(dpred1.class);
> job1.setReducerClass(dpred1.class);
> job1.setMapOutputKeyClass(Text.class);
> job1.setMapOutputValueClass(DoubleWritable.class);
> job1.setOutputKeyClass(Text.class);
> job1.setOutputValueClass(DoubleWritable.class);
> FileOutputFormat.setOutputPath(job1, new Path(OUTPUT_PATH1));
>
>
> job1.setInputFormatClass(CqlPagingInputFormat.class);
>
> ConfigHelper.setInputRpcPort(job1.getConfiguration(), "9160");
> ConfigHelper.setInputInitialAddress(job1.getConfiguration(), "localhost");
> ConfigHelper.setInputColumnFamily(job1.getConfiguration(), KEYSPACE, COLUMN_FAMILY);
> ConfigHelper.setInputPartitioner(job1.getConfiguration(), "Murmur3Partitioner");
>
> CqlConfigHelper.setInputCQLPageRowSize(job1.getConfiguration(), "3");
> job1.waitForCompletion(true);
>
> /***************************************/
>
> if (job1.isSuccessful()){
> Job job2 = new Job(getConf(), "DewPoint");
> job2.setJarByClass(dewpoint.class);
> job2.setMapperClass(dpmap2.class);
> job2.setCombinerClass(dpred2.class);
> job2.setReducerClass(dpred2.class);
> job2.setMapOutputKeyClass(Text.class);
> job2.setMapOutputValueClass(DoubleWritable.class);
> job2.setOutputKeyClass(Text.class);
> job2.setOutputValueClass(DoubleWritable.class);
> job2.setOutputFormatClass(TextOutputFormat.class);
> job2.setInputFormatClass(SequenceFileInputFormat.class);
> FileInputFormat.addInputPath(job2, new Path(OUTPUT_PATH1));
> FileOutputFormat.setOutputPath(job2, new Path(OUTPUT_PATH2));
> job2.waitForCompletion(true);
> }
> ///////////////////////////////////////////////////
>
> return 0;
> }
> }
>
> for example in my second map phase when I do a System.out.println(key) it does not print any thing and in reduce result the value is 'infinity'....
>
>
>
Re: map phase does not read intermediate results with SequenceFileInputFormat
Posted by Dieter De Witte <dr...@gmail.com>.
The question is also on stackoverflow, the problem is that she divides by
zero in the second mapper I think. (the logs show that both jobs have a
data flow..
2013/10/25 Robin East <ro...@xense.co.uk>
> Hi
>
> Are you sure job1 created output where you expected it and in the format
> you expect? Have you tested job2 on its own with some hand-crafted test
> data? Is the output from job1 consistent with your hand-crafted test data
> for job2?
>
> regards
> On 25 Oct 2013, at 06:46, Anseh Danesh <an...@gmail.com> wrote:
>
> Hi all.
>
> I have a mapreduce program with two jobs. second job's key and value comes
> from first job output. but I think the second map does not get the result
> from first job. in other words I think my second job did not read the
> output of my first job.. what should I do?
>
> here is the code:
>
> public class dewpoint extends Configured implements Tool
> {
> private static final Logger logger = LoggerFactory.getLogger(dewpoint.class);
>
> static final String KEYSPACE = "weather";
> static final String COLUMN_FAMILY = "user";
> private static final String OUTPUT_PATH1 = "/tmp/intermediate1";
> private static final String OUTPUT_PATH2 = "/tmp/intermediate2";
> private static final String OUTPUT_PATH3 = "/tmp/intermediate3";
> private static final String INPUT_PATH1 = "/tmp/intermediate1";
>
> public static void main(String[] args) throws Exception
> {
>
> ToolRunner.run(new Configuration(), new dewpoint(), args);
> System.exit(0);
> }
>
> ///////////////////////////////////////////////////////////
>
> public static class dpmap1 extends Mapper<Map<String, ByteBuffer>, Map<FloatWritable, ByteBuffer>, Text, DoubleWritable>
> {
> DoubleWritable val1 = new DoubleWritable();
> Text word = new Text();
> String date;
> float temp;
> public void map(Map<String, ByteBuffer> keys, Map<FloatWritable, ByteBuffer> columns, Context context) throws IOException, InterruptedException
> {
>
> for (Entry<String, ByteBuffer> key : keys.entrySet())
> {
> //System.out.println(key.getKey());
> if (!"date".equals(key.getKey()))
> continue;
> date = ByteBufferUtil.string(key.getValue());
> word.set(date);
> }
>
>
> for (Entry<FloatWritable, ByteBuffer> column : columns.entrySet())
> {
> if (!"temprature".equals(column.getKey()))
> continue;
> temp = ByteBufferUtil.toFloat(column.getValue());
> val1.set(temp);
> //System.out.println(temp);
> }
> context.write(word, val1);
> }
> }
>
> ///////////////////////////////////////////////////////////
>
> public static class dpred1 extends Reducer<Text, DoubleWritable, Text, DoubleWritable>
> {
> public void reduce(Text key, Iterable<DoubleWritable> values, Context context) throws IOException, InterruptedException
> {
> double beta = 17.62;
> double landa = 243.12;
> DoubleWritable result1 = new DoubleWritable();
> DoubleWritable result2 = new DoubleWritable();
> for (DoubleWritable val : values){
> // System.out.println(val.get());
> beta *= val.get();
> landa+=val.get();
> }
> result1.set(beta);
> result2.set(landa);
>
> context.write(key, result1);
> context.write(key, result2);
> }
> }
> ///////////////////////////////////////////////////////////
>
> public static class dpmap2 extends Mapper <Text, DoubleWritable, Text, DoubleWritable>{
>
> Text key2 = new Text();
> double temp1, temp2 =0;
>
> public void map(Text key, Iterable<DoubleWritable> values, Context context) throws IOException, InterruptedException {
> String[] sp = values.toString().split("\t");
> for (int i=0; i< sp.length; i+=4)
> //key2.set(sp[i]);
> System.out.println(sp[i]);
> for(int j=1;j< sp.length; j+=4)
> temp1 = Double.valueOf(sp[j]);
> for (int k=3;k< sp.length; k+=4)
> temp2 = Double.valueOf(sp[k]);
> context.write(key2, new DoubleWritable(temp2/temp1));
>
> }
> }
>
> ///////////////////////////////////////////////////////////
>
>
> public static class dpred2 extends Reducer<Text, DoubleWritable, Text, DoubleWritable>
> {
> public void reduce(Text key, Iterable<DoubleWritable> values, Context context) throws IOException, InterruptedException
> {
>
> double alpha = 6.112;
> double tmp = 0;
> DoubleWritable result3 = new DoubleWritable();
> for (DoubleWritable val : values){
> System.out.println(val.get());
> tmp = alpha*(Math.pow(Math.E, val.get()));
>
> }
> result3.set(tmp);
> context.write(key, result3);
>
>
> }
> }
>
>
> ///////////////////////////////////////////////////////////
>
>
> public int run(String[] args) throws Exception
> {
>
> Job job1 = new Job(getConf(), "DewPoint");
> job1.setJarByClass(dewpoint.class);
> job1.setMapperClass(dpmap1.class);
> job1.setOutputFormatClass(SequenceFileOutputFormat.class);
> job1.setCombinerClass(dpred1.class);
> job1.setReducerClass(dpred1.class);
> job1.setMapOutputKeyClass(Text.class);
> job1.setMapOutputValueClass(DoubleWritable.class);
> job1.setOutputKeyClass(Text.class);
> job1.setOutputValueClass(DoubleWritable.class);
> FileOutputFormat.setOutputPath(job1, new Path(OUTPUT_PATH1));
>
>
> job1.setInputFormatClass(CqlPagingInputFormat.class);
>
> ConfigHelper.setInputRpcPort(job1.getConfiguration(), "9160");
> ConfigHelper.setInputInitialAddress(job1.getConfiguration(), "localhost");
> ConfigHelper.setInputColumnFamily(job1.getConfiguration(), KEYSPACE, COLUMN_FAMILY);
> ConfigHelper.setInputPartitioner(job1.getConfiguration(), "Murmur3Partitioner");
>
> CqlConfigHelper.setInputCQLPageRowSize(job1.getConfiguration(), "3");
> job1.waitForCompletion(true);
>
> /***************************************/
>
> if (job1.isSuccessful()){
> Job job2 = new Job(getConf(), "DewPoint");
> job2.setJarByClass(dewpoint.class);
> job2.setMapperClass(dpmap2.class);
> job2.setCombinerClass(dpred2.class);
> job2.setReducerClass(dpred2.class);
> job2.setMapOutputKeyClass(Text.class);
> job2.setMapOutputValueClass(DoubleWritable.class);
> job2.setOutputKeyClass(Text.class);
> job2.setOutputValueClass(DoubleWritable.class);
> job2.setOutputFormatClass(TextOutputFormat.class);
> job2.setInputFormatClass(SequenceFileInputFormat.class);
> FileInputFormat.addInputPath(job2, new Path(OUTPUT_PATH1));
> FileOutputFormat.setOutputPath(job2, new Path(OUTPUT_PATH2));
> job2.waitForCompletion(true);
> }
> ///////////////////////////////////////////////////
>
> return 0;
> }
> }
>
> for example in my second map phase when I do a System.out.println(key) it does not print any thing and in reduce result the value is 'infinity'....
>
>
>
Re: map phase does not read intermediate results with SequenceFileInputFormat
Posted by Robin East <ro...@xense.co.uk>.
Hi
Are you sure job1 created output where you expected it and in the format you expect? Have you tested job2 on its own with some hand-crafted test data? Is the output from job1 consistent with your hand-crafted test data for job2?
regards
On 25 Oct 2013, at 06:46, Anseh Danesh <an...@gmail.com> wrote:
> Hi all.
> I have a mapreduce program with two jobs. second job's key and value comes from first job output. but I think the second map does not get the result from first job. in other words I think my second job did not read the output of my first job.. what should I do?
>
> here is the code:
>
> public class dewpoint extends Configured implements Tool
> {
> private static final Logger logger = LoggerFactory.getLogger(dewpoint.class);
>
> static final String KEYSPACE = "weather";
> static final String COLUMN_FAMILY = "user";
> private static final String OUTPUT_PATH1 = "/tmp/intermediate1";
> private static final String OUTPUT_PATH2 = "/tmp/intermediate2";
> private static final String OUTPUT_PATH3 = "/tmp/intermediate3";
> private static final String INPUT_PATH1 = "/tmp/intermediate1";
>
> public static void main(String[] args) throws Exception
> {
>
> ToolRunner.run(new Configuration(), new dewpoint(), args);
> System.exit(0);
> }
>
> ///////////////////////////////////////////////////////////
>
> public static class dpmap1 extends Mapper<Map<String, ByteBuffer>, Map<FloatWritable, ByteBuffer>, Text, DoubleWritable>
> {
> DoubleWritable val1 = new DoubleWritable();
> Text word = new Text();
> String date;
> float temp;
> public void map(Map<String, ByteBuffer> keys, Map<FloatWritable, ByteBuffer> columns, Context context) throws IOException, InterruptedException
> {
>
> for (Entry<String, ByteBuffer> key : keys.entrySet())
> {
> //System.out.println(key.getKey());
> if (!"date".equals(key.getKey()))
> continue;
> date = ByteBufferUtil.string(key.getValue());
> word.set(date);
> }
>
>
> for (Entry<FloatWritable, ByteBuffer> column : columns.entrySet())
> {
> if (!"temprature".equals(column.getKey()))
> continue;
> temp = ByteBufferUtil.toFloat(column.getValue());
> val1.set(temp);
> //System.out.println(temp);
> }
> context.write(word, val1);
> }
> }
>
> ///////////////////////////////////////////////////////////
>
> public static class dpred1 extends Reducer<Text, DoubleWritable, Text, DoubleWritable>
> {
> public void reduce(Text key, Iterable<DoubleWritable> values, Context context) throws IOException, InterruptedException
> {
> double beta = 17.62;
> double landa = 243.12;
> DoubleWritable result1 = new DoubleWritable();
> DoubleWritable result2 = new DoubleWritable();
> for (DoubleWritable val : values){
> // System.out.println(val.get());
> beta *= val.get();
> landa+=val.get();
> }
> result1.set(beta);
> result2.set(landa);
>
> context.write(key, result1);
> context.write(key, result2);
> }
> }
> ///////////////////////////////////////////////////////////
>
> public static class dpmap2 extends Mapper <Text, DoubleWritable, Text, DoubleWritable>{
>
> Text key2 = new Text();
> double temp1, temp2 =0;
>
> public void map(Text key, Iterable<DoubleWritable> values, Context context) throws IOException, InterruptedException {
> String[] sp = values.toString().split("\t");
> for (int i=0; i< sp.length; i+=4)
> //key2.set(sp[i]);
> System.out.println(sp[i]);
> for(int j=1;j< sp.length; j+=4)
> temp1 = Double.valueOf(sp[j]);
> for (int k=3;k< sp.length; k+=4)
> temp2 = Double.valueOf(sp[k]);
> context.write(key2, new DoubleWritable(temp2/temp1));
>
> }
> }
>
> ///////////////////////////////////////////////////////////
>
>
> public static class dpred2 extends Reducer<Text, DoubleWritable, Text, DoubleWritable>
> {
> public void reduce(Text key, Iterable<DoubleWritable> values, Context context) throws IOException, InterruptedException
> {
>
> double alpha = 6.112;
> double tmp = 0;
> DoubleWritable result3 = new DoubleWritable();
> for (DoubleWritable val : values){
> System.out.println(val.get());
> tmp = alpha*(Math.pow(Math.E, val.get()));
>
> }
> result3.set(tmp);
> context.write(key, result3);
>
>
> }
> }
>
>
> ///////////////////////////////////////////////////////////
>
>
> public int run(String[] args) throws Exception
> {
>
> Job job1 = new Job(getConf(), "DewPoint");
> job1.setJarByClass(dewpoint.class);
> job1.setMapperClass(dpmap1.class);
> job1.setOutputFormatClass(SequenceFileOutputFormat.class);
> job1.setCombinerClass(dpred1.class);
> job1.setReducerClass(dpred1.class);
> job1.setMapOutputKeyClass(Text.class);
> job1.setMapOutputValueClass(DoubleWritable.class);
> job1.setOutputKeyClass(Text.class);
> job1.setOutputValueClass(DoubleWritable.class);
> FileOutputFormat.setOutputPath(job1, new Path(OUTPUT_PATH1));
>
>
> job1.setInputFormatClass(CqlPagingInputFormat.class);
>
> ConfigHelper.setInputRpcPort(job1.getConfiguration(), "9160");
> ConfigHelper.setInputInitialAddress(job1.getConfiguration(), "localhost");
> ConfigHelper.setInputColumnFamily(job1.getConfiguration(), KEYSPACE, COLUMN_FAMILY);
> ConfigHelper.setInputPartitioner(job1.getConfiguration(), "Murmur3Partitioner");
>
> CqlConfigHelper.setInputCQLPageRowSize(job1.getConfiguration(), "3");
> job1.waitForCompletion(true);
>
> /***************************************/
>
> if (job1.isSuccessful()){
> Job job2 = new Job(getConf(), "DewPoint");
> job2.setJarByClass(dewpoint.class);
> job2.setMapperClass(dpmap2.class);
> job2.setCombinerClass(dpred2.class);
> job2.setReducerClass(dpred2.class);
> job2.setMapOutputKeyClass(Text.class);
> job2.setMapOutputValueClass(DoubleWritable.class);
> job2.setOutputKeyClass(Text.class);
> job2.setOutputValueClass(DoubleWritable.class);
> job2.setOutputFormatClass(TextOutputFormat.class);
> job2.setInputFormatClass(SequenceFileInputFormat.class);
> FileInputFormat.addInputPath(job2, new Path(OUTPUT_PATH1));
> FileOutputFormat.setOutputPath(job2, new Path(OUTPUT_PATH2));
> job2.waitForCompletion(true);
> }
> ///////////////////////////////////////////////////
>
> return 0;
> }
> }
>
> for example in my second map phase when I do a System.out.println(key) it does not print any thing and in reduce result the value is 'infinity'....
Re: map phase does not read intermediate results with SequenceFileInputFormat
Posted by Robin East <ro...@xense.co.uk>.
Hi
Are you sure job1 created output where you expected it and in the format you expect? Have you tested job2 on its own with some hand-crafted test data? Is the output from job1 consistent with your hand-crafted test data for job2?
regards
On 25 Oct 2013, at 06:46, Anseh Danesh <an...@gmail.com> wrote:
> Hi all.
> I have a mapreduce program with two jobs. second job's key and value comes from first job output. but I think the second map does not get the result from first job. in other words I think my second job did not read the output of my first job.. what should I do?
>
> here is the code:
>
> public class dewpoint extends Configured implements Tool
> {
> private static final Logger logger = LoggerFactory.getLogger(dewpoint.class);
>
> static final String KEYSPACE = "weather";
> static final String COLUMN_FAMILY = "user";
> private static final String OUTPUT_PATH1 = "/tmp/intermediate1";
> private static final String OUTPUT_PATH2 = "/tmp/intermediate2";
> private static final String OUTPUT_PATH3 = "/tmp/intermediate3";
> private static final String INPUT_PATH1 = "/tmp/intermediate1";
>
> public static void main(String[] args) throws Exception
> {
>
> ToolRunner.run(new Configuration(), new dewpoint(), args);
> System.exit(0);
> }
>
> ///////////////////////////////////////////////////////////
>
> public static class dpmap1 extends Mapper<Map<String, ByteBuffer>, Map<FloatWritable, ByteBuffer>, Text, DoubleWritable>
> {
> DoubleWritable val1 = new DoubleWritable();
> Text word = new Text();
> String date;
> float temp;
> public void map(Map<String, ByteBuffer> keys, Map<FloatWritable, ByteBuffer> columns, Context context) throws IOException, InterruptedException
> {
>
> for (Entry<String, ByteBuffer> key : keys.entrySet())
> {
> //System.out.println(key.getKey());
> if (!"date".equals(key.getKey()))
> continue;
> date = ByteBufferUtil.string(key.getValue());
> word.set(date);
> }
>
>
> for (Entry<FloatWritable, ByteBuffer> column : columns.entrySet())
> {
> if (!"temprature".equals(column.getKey()))
> continue;
> temp = ByteBufferUtil.toFloat(column.getValue());
> val1.set(temp);
> //System.out.println(temp);
> }
> context.write(word, val1);
> }
> }
>
> ///////////////////////////////////////////////////////////
>
> public static class dpred1 extends Reducer<Text, DoubleWritable, Text, DoubleWritable>
> {
> public void reduce(Text key, Iterable<DoubleWritable> values, Context context) throws IOException, InterruptedException
> {
> double beta = 17.62;
> double landa = 243.12;
> DoubleWritable result1 = new DoubleWritable();
> DoubleWritable result2 = new DoubleWritable();
> for (DoubleWritable val : values){
> // System.out.println(val.get());
> beta *= val.get();
> landa+=val.get();
> }
> result1.set(beta);
> result2.set(landa);
>
> context.write(key, result1);
> context.write(key, result2);
> }
> }
> ///////////////////////////////////////////////////////////
>
> public static class dpmap2 extends Mapper <Text, DoubleWritable, Text, DoubleWritable>{
>
> Text key2 = new Text();
> double temp1, temp2 =0;
>
> public void map(Text key, Iterable<DoubleWritable> values, Context context) throws IOException, InterruptedException {
> String[] sp = values.toString().split("\t");
> for (int i=0; i< sp.length; i+=4)
> //key2.set(sp[i]);
> System.out.println(sp[i]);
> for(int j=1;j< sp.length; j+=4)
> temp1 = Double.valueOf(sp[j]);
> for (int k=3;k< sp.length; k+=4)
> temp2 = Double.valueOf(sp[k]);
> context.write(key2, new DoubleWritable(temp2/temp1));
>
> }
> }
>
> ///////////////////////////////////////////////////////////
>
>
> public static class dpred2 extends Reducer<Text, DoubleWritable, Text, DoubleWritable>
> {
> public void reduce(Text key, Iterable<DoubleWritable> values, Context context) throws IOException, InterruptedException
> {
>
> double alpha = 6.112;
> double tmp = 0;
> DoubleWritable result3 = new DoubleWritable();
> for (DoubleWritable val : values){
> System.out.println(val.get());
> tmp = alpha*(Math.pow(Math.E, val.get()));
>
> }
> result3.set(tmp);
> context.write(key, result3);
>
>
> }
> }
>
>
> ///////////////////////////////////////////////////////////
>
>
> public int run(String[] args) throws Exception
> {
>
> Job job1 = new Job(getConf(), "DewPoint");
> job1.setJarByClass(dewpoint.class);
> job1.setMapperClass(dpmap1.class);
> job1.setOutputFormatClass(SequenceFileOutputFormat.class);
> job1.setCombinerClass(dpred1.class);
> job1.setReducerClass(dpred1.class);
> job1.setMapOutputKeyClass(Text.class);
> job1.setMapOutputValueClass(DoubleWritable.class);
> job1.setOutputKeyClass(Text.class);
> job1.setOutputValueClass(DoubleWritable.class);
> FileOutputFormat.setOutputPath(job1, new Path(OUTPUT_PATH1));
>
>
> job1.setInputFormatClass(CqlPagingInputFormat.class);
>
> ConfigHelper.setInputRpcPort(job1.getConfiguration(), "9160");
> ConfigHelper.setInputInitialAddress(job1.getConfiguration(), "localhost");
> ConfigHelper.setInputColumnFamily(job1.getConfiguration(), KEYSPACE, COLUMN_FAMILY);
> ConfigHelper.setInputPartitioner(job1.getConfiguration(), "Murmur3Partitioner");
>
> CqlConfigHelper.setInputCQLPageRowSize(job1.getConfiguration(), "3");
> job1.waitForCompletion(true);
>
> /***************************************/
>
> if (job1.isSuccessful()){
> Job job2 = new Job(getConf(), "DewPoint");
> job2.setJarByClass(dewpoint.class);
> job2.setMapperClass(dpmap2.class);
> job2.setCombinerClass(dpred2.class);
> job2.setReducerClass(dpred2.class);
> job2.setMapOutputKeyClass(Text.class);
> job2.setMapOutputValueClass(DoubleWritable.class);
> job2.setOutputKeyClass(Text.class);
> job2.setOutputValueClass(DoubleWritable.class);
> job2.setOutputFormatClass(TextOutputFormat.class);
> job2.setInputFormatClass(SequenceFileInputFormat.class);
> FileInputFormat.addInputPath(job2, new Path(OUTPUT_PATH1));
> FileOutputFormat.setOutputPath(job2, new Path(OUTPUT_PATH2));
> job2.waitForCompletion(true);
> }
> ///////////////////////////////////////////////////
>
> return 0;
> }
> }
>
> for example in my second map phase when I do a System.out.println(key) it does not print any thing and in reduce result the value is 'infinity'....
Re: map phase does not read intermediate results with SequenceFileInputFormat
Posted by Robin East <ro...@xense.co.uk>.
Hi
Are you sure job1 created output where you expected it and in the format you expect? Have you tested job2 on its own with some hand-crafted test data? Is the output from job1 consistent with your hand-crafted test data for job2?
regards
On 25 Oct 2013, at 06:46, Anseh Danesh <an...@gmail.com> wrote:
> Hi all.
> I have a mapreduce program with two jobs. second job's key and value comes from first job output. but I think the second map does not get the result from first job. in other words I think my second job did not read the output of my first job.. what should I do?
>
> here is the code:
>
> public class dewpoint extends Configured implements Tool
> {
> private static final Logger logger = LoggerFactory.getLogger(dewpoint.class);
>
> static final String KEYSPACE = "weather";
> static final String COLUMN_FAMILY = "user";
> private static final String OUTPUT_PATH1 = "/tmp/intermediate1";
> private static final String OUTPUT_PATH2 = "/tmp/intermediate2";
> private static final String OUTPUT_PATH3 = "/tmp/intermediate3";
> private static final String INPUT_PATH1 = "/tmp/intermediate1";
>
> public static void main(String[] args) throws Exception
> {
>
> ToolRunner.run(new Configuration(), new dewpoint(), args);
> System.exit(0);
> }
>
> ///////////////////////////////////////////////////////////
>
> public static class dpmap1 extends Mapper<Map<String, ByteBuffer>, Map<FloatWritable, ByteBuffer>, Text, DoubleWritable>
> {
> DoubleWritable val1 = new DoubleWritable();
> Text word = new Text();
> String date;
> float temp;
> public void map(Map<String, ByteBuffer> keys, Map<FloatWritable, ByteBuffer> columns, Context context) throws IOException, InterruptedException
> {
>
> for (Entry<String, ByteBuffer> key : keys.entrySet())
> {
> //System.out.println(key.getKey());
> if (!"date".equals(key.getKey()))
> continue;
> date = ByteBufferUtil.string(key.getValue());
> word.set(date);
> }
>
>
> for (Entry<FloatWritable, ByteBuffer> column : columns.entrySet())
> {
> if (!"temprature".equals(column.getKey()))
> continue;
> temp = ByteBufferUtil.toFloat(column.getValue());
> val1.set(temp);
> //System.out.println(temp);
> }
> context.write(word, val1);
> }
> }
>
> ///////////////////////////////////////////////////////////
>
> public static class dpred1 extends Reducer<Text, DoubleWritable, Text, DoubleWritable>
> {
> public void reduce(Text key, Iterable<DoubleWritable> values, Context context) throws IOException, InterruptedException
> {
> double beta = 17.62;
> double landa = 243.12;
> DoubleWritable result1 = new DoubleWritable();
> DoubleWritable result2 = new DoubleWritable();
> for (DoubleWritable val : values){
> // System.out.println(val.get());
> beta *= val.get();
> landa+=val.get();
> }
> result1.set(beta);
> result2.set(landa);
>
> context.write(key, result1);
> context.write(key, result2);
> }
> }
> ///////////////////////////////////////////////////////////
>
> public static class dpmap2 extends Mapper <Text, DoubleWritable, Text, DoubleWritable>{
>
> Text key2 = new Text();
> double temp1, temp2 =0;
>
> public void map(Text key, Iterable<DoubleWritable> values, Context context) throws IOException, InterruptedException {
> String[] sp = values.toString().split("\t");
> for (int i=0; i< sp.length; i+=4)
> //key2.set(sp[i]);
> System.out.println(sp[i]);
> for(int j=1;j< sp.length; j+=4)
> temp1 = Double.valueOf(sp[j]);
> for (int k=3;k< sp.length; k+=4)
> temp2 = Double.valueOf(sp[k]);
> context.write(key2, new DoubleWritable(temp2/temp1));
>
> }
> }
>
> ///////////////////////////////////////////////////////////
>
>
> public static class dpred2 extends Reducer<Text, DoubleWritable, Text, DoubleWritable>
> {
> public void reduce(Text key, Iterable<DoubleWritable> values, Context context) throws IOException, InterruptedException
> {
>
> double alpha = 6.112;
> double tmp = 0;
> DoubleWritable result3 = new DoubleWritable();
> for (DoubleWritable val : values){
> System.out.println(val.get());
> tmp = alpha*(Math.pow(Math.E, val.get()));
>
> }
> result3.set(tmp);
> context.write(key, result3);
>
>
> }
> }
>
>
> ///////////////////////////////////////////////////////////
>
>
> public int run(String[] args) throws Exception
> {
>
> Job job1 = new Job(getConf(), "DewPoint");
> job1.setJarByClass(dewpoint.class);
> job1.setMapperClass(dpmap1.class);
> job1.setOutputFormatClass(SequenceFileOutputFormat.class);
> job1.setCombinerClass(dpred1.class);
> job1.setReducerClass(dpred1.class);
> job1.setMapOutputKeyClass(Text.class);
> job1.setMapOutputValueClass(DoubleWritable.class);
> job1.setOutputKeyClass(Text.class);
> job1.setOutputValueClass(DoubleWritable.class);
> FileOutputFormat.setOutputPath(job1, new Path(OUTPUT_PATH1));
>
>
> job1.setInputFormatClass(CqlPagingInputFormat.class);
>
> ConfigHelper.setInputRpcPort(job1.getConfiguration(), "9160");
> ConfigHelper.setInputInitialAddress(job1.getConfiguration(), "localhost");
> ConfigHelper.setInputColumnFamily(job1.getConfiguration(), KEYSPACE, COLUMN_FAMILY);
> ConfigHelper.setInputPartitioner(job1.getConfiguration(), "Murmur3Partitioner");
>
> CqlConfigHelper.setInputCQLPageRowSize(job1.getConfiguration(), "3");
> job1.waitForCompletion(true);
>
> /***************************************/
>
> if (job1.isSuccessful()){
> Job job2 = new Job(getConf(), "DewPoint");
> job2.setJarByClass(dewpoint.class);
> job2.setMapperClass(dpmap2.class);
> job2.setCombinerClass(dpred2.class);
> job2.setReducerClass(dpred2.class);
> job2.setMapOutputKeyClass(Text.class);
> job2.setMapOutputValueClass(DoubleWritable.class);
> job2.setOutputKeyClass(Text.class);
> job2.setOutputValueClass(DoubleWritable.class);
> job2.setOutputFormatClass(TextOutputFormat.class);
> job2.setInputFormatClass(SequenceFileInputFormat.class);
> FileInputFormat.addInputPath(job2, new Path(OUTPUT_PATH1));
> FileOutputFormat.setOutputPath(job2, new Path(OUTPUT_PATH2));
> job2.waitForCompletion(true);
> }
> ///////////////////////////////////////////////////
>
> return 0;
> }
> }
>
> for example in my second map phase when I do a System.out.println(key) it does not print any thing and in reduce result the value is 'infinity'....
Re: map phase does not read intermediate results with SequenceFileInputFormat
Posted by Robin East <ro...@xense.co.uk>.
Hi
Are you sure job1 created output where you expected it and in the format you expect? Have you tested job2 on its own with some hand-crafted test data? Is the output from job1 consistent with your hand-crafted test data for job2?
regards
On 25 Oct 2013, at 06:46, Anseh Danesh <an...@gmail.com> wrote:
> Hi all.
> I have a mapreduce program with two jobs. second job's key and value comes from first job output. but I think the second map does not get the result from first job. in other words I think my second job did not read the output of my first job.. what should I do?
>
> here is the code:
>
> public class dewpoint extends Configured implements Tool
> {
> private static final Logger logger = LoggerFactory.getLogger(dewpoint.class);
>
> static final String KEYSPACE = "weather";
> static final String COLUMN_FAMILY = "user";
> private static final String OUTPUT_PATH1 = "/tmp/intermediate1";
> private static final String OUTPUT_PATH2 = "/tmp/intermediate2";
> private static final String OUTPUT_PATH3 = "/tmp/intermediate3";
> private static final String INPUT_PATH1 = "/tmp/intermediate1";
>
> public static void main(String[] args) throws Exception
> {
>
> ToolRunner.run(new Configuration(), new dewpoint(), args);
> System.exit(0);
> }
>
> ///////////////////////////////////////////////////////////
>
> public static class dpmap1 extends Mapper<Map<String, ByteBuffer>, Map<FloatWritable, ByteBuffer>, Text, DoubleWritable>
> {
> DoubleWritable val1 = new DoubleWritable();
> Text word = new Text();
> String date;
> float temp;
> public void map(Map<String, ByteBuffer> keys, Map<FloatWritable, ByteBuffer> columns, Context context) throws IOException, InterruptedException
> {
>
> for (Entry<String, ByteBuffer> key : keys.entrySet())
> {
> //System.out.println(key.getKey());
> if (!"date".equals(key.getKey()))
> continue;
> date = ByteBufferUtil.string(key.getValue());
> word.set(date);
> }
>
>
> for (Entry<FloatWritable, ByteBuffer> column : columns.entrySet())
> {
> if (!"temprature".equals(column.getKey()))
> continue;
> temp = ByteBufferUtil.toFloat(column.getValue());
> val1.set(temp);
> //System.out.println(temp);
> }
> context.write(word, val1);
> }
> }
>
> ///////////////////////////////////////////////////////////
>
> public static class dpred1 extends Reducer<Text, DoubleWritable, Text, DoubleWritable>
> {
> public void reduce(Text key, Iterable<DoubleWritable> values, Context context) throws IOException, InterruptedException
> {
> double beta = 17.62;
> double landa = 243.12;
> DoubleWritable result1 = new DoubleWritable();
> DoubleWritable result2 = new DoubleWritable();
> for (DoubleWritable val : values){
> // System.out.println(val.get());
> beta *= val.get();
> landa+=val.get();
> }
> result1.set(beta);
> result2.set(landa);
>
> context.write(key, result1);
> context.write(key, result2);
> }
> }
> ///////////////////////////////////////////////////////////
>
> public static class dpmap2 extends Mapper <Text, DoubleWritable, Text, DoubleWritable>{
>
> Text key2 = new Text();
> double temp1, temp2 =0;
>
> public void map(Text key, Iterable<DoubleWritable> values, Context context) throws IOException, InterruptedException {
> String[] sp = values.toString().split("\t");
> for (int i=0; i< sp.length; i+=4)
> //key2.set(sp[i]);
> System.out.println(sp[i]);
> for(int j=1;j< sp.length; j+=4)
> temp1 = Double.valueOf(sp[j]);
> for (int k=3;k< sp.length; k+=4)
> temp2 = Double.valueOf(sp[k]);
> context.write(key2, new DoubleWritable(temp2/temp1));
>
> }
> }
>
> ///////////////////////////////////////////////////////////
>
>
> public static class dpred2 extends Reducer<Text, DoubleWritable, Text, DoubleWritable>
> {
> public void reduce(Text key, Iterable<DoubleWritable> values, Context context) throws IOException, InterruptedException
> {
>
> double alpha = 6.112;
> double tmp = 0;
> DoubleWritable result3 = new DoubleWritable();
> for (DoubleWritable val : values){
> System.out.println(val.get());
> tmp = alpha*(Math.pow(Math.E, val.get()));
>
> }
> result3.set(tmp);
> context.write(key, result3);
>
>
> }
> }
>
>
> ///////////////////////////////////////////////////////////
>
>
> public int run(String[] args) throws Exception
> {
>
> Job job1 = new Job(getConf(), "DewPoint");
> job1.setJarByClass(dewpoint.class);
> job1.setMapperClass(dpmap1.class);
> job1.setOutputFormatClass(SequenceFileOutputFormat.class);
> job1.setCombinerClass(dpred1.class);
> job1.setReducerClass(dpred1.class);
> job1.setMapOutputKeyClass(Text.class);
> job1.setMapOutputValueClass(DoubleWritable.class);
> job1.setOutputKeyClass(Text.class);
> job1.setOutputValueClass(DoubleWritable.class);
> FileOutputFormat.setOutputPath(job1, new Path(OUTPUT_PATH1));
>
>
> job1.setInputFormatClass(CqlPagingInputFormat.class);
>
> ConfigHelper.setInputRpcPort(job1.getConfiguration(), "9160");
> ConfigHelper.setInputInitialAddress(job1.getConfiguration(), "localhost");
> ConfigHelper.setInputColumnFamily(job1.getConfiguration(), KEYSPACE, COLUMN_FAMILY);
> ConfigHelper.setInputPartitioner(job1.getConfiguration(), "Murmur3Partitioner");
>
> CqlConfigHelper.setInputCQLPageRowSize(job1.getConfiguration(), "3");
> job1.waitForCompletion(true);
>
> /***************************************/
>
> if (job1.isSuccessful()){
> Job job2 = new Job(getConf(), "DewPoint");
> job2.setJarByClass(dewpoint.class);
> job2.setMapperClass(dpmap2.class);
> job2.setCombinerClass(dpred2.class);
> job2.setReducerClass(dpred2.class);
> job2.setMapOutputKeyClass(Text.class);
> job2.setMapOutputValueClass(DoubleWritable.class);
> job2.setOutputKeyClass(Text.class);
> job2.setOutputValueClass(DoubleWritable.class);
> job2.setOutputFormatClass(TextOutputFormat.class);
> job2.setInputFormatClass(SequenceFileInputFormat.class);
> FileInputFormat.addInputPath(job2, new Path(OUTPUT_PATH1));
> FileOutputFormat.setOutputPath(job2, new Path(OUTPUT_PATH2));
> job2.waitForCompletion(true);
> }
> ///////////////////////////////////////////////////
>
> return 0;
> }
> }
>
> for example in my second map phase when I do a System.out.println(key) it does not print any thing and in reduce result the value is 'infinity'....