You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-user@hadoop.apache.org by Ranadip Chatterjee <ra...@gmail.com> on 2015/04/01 08:27:01 UTC

Re: Simple MapReduce logic using Java API

Eating up the IOException in the mapper looks suspicious to me. That can
silently consume the input without any output. Also check in the map sysout
messages for the console print output.

As an aside, since you are not doing anything in the reduce, try setting
number of reduces to 0. That will force the job to be map only and make it
simpler.

Regards,
Ranadip
On 31 Mar 2015 19:23, "Shahab Yunus" <sh...@gmail.com> wrote:

> What is the reason of using the queue?
> "job.getConfiguration().set("mapred.job.queue.name", "exp_dsa");"
>
> Is your mapper or reducer even been called?
>
> Try adding the override annotation to the map/reduce methods as below:
>
> @Override
>  public void map(Object key, Text value, Context context) throws
> IOException, InterruptedException {
>
> Regards,
> Shahab
>
> On Tue, Mar 31, 2015 at 3:26 AM, bradford li <br...@gmail.com> wrote:
>
>> I'm not sure why my Mapper and Reducer have no output. The logic behind
>> my code is, given a file of UUIDs (new line separated), I want to use
>> `globStatus` to display all the paths to all potential files that the UUID
>> might be in. Open and read the file. Each file contains 1-n lines of JSON.
>> The UUID is in `event_header.event_id` in the JSON.
>>
>> Right now the MapReduce job runs without errors. However, something is
>> wrong because I dont have any output. I'm not sure how to debug MapReduce
>> jobs as well. If someone could provide me a source that would be awesome!
>> The expected output from this program should be
>>
>>     UUID_1 1
>>     UUID_2 1
>>     UUID_3 1
>>     UUID_4 1
>>     ...
>>     ...
>>     UUID_n 1
>>
>> In my logic, the output file should be the UUIDs with a 1 next to them
>> because upon found, 1 is written, if not found 0 is written. They should be
>> all 1's because I pulled the UUIDs from the source.
>>
>> My Reducer currently does not do anything except I just wanted to see if
>> I could get some simple logic working. There are most likely bugs in my
>> code as I dont know have a easy way to debug MapReduce jobs
>>
>> Driver:
>>
>>     public class SearchUUID {
>>
>>         public static void main(String[] args) throws Exception {
>>             Configuration conf = new Configuration();
>>             Job job = Job.getInstance(conf, "UUID Search");
>>             job.getConfiguration().set("mapred.job.queue.name",
>> "exp_dsa");
>>             job.setJarByClass(SearchUUID.class);
>>             job.setMapperClass(UUIDMapper.class);
>>             job.setReducerClass(UUIDReducer.class);
>>             job.setOutputKeyClass(Text.class);
>>             job.setOutputValueClass(Text.class);
>>             FileInputFormat.addInputPath(job, new Path(args[0]));
>>             FileOutputFormat.setOutputPath(job, new Path(args[1]));
>>             System.exit(job.waitForCompletion(true) ? 0 : 1);
>>         }
>>     }
>>
>>
>> UUIDMapper:
>>
>>     public class UUIDMapper extends Mapper<Object, Text, Text, Text> {
>>         public void map(Object key, Text value, Context context) throws
>> IOException, InterruptedException {
>>
>>             try {
>>                 Text one = new Text("1");
>>                 Text zero = new Text("0");
>>
>>                 FileSystem fs = FileSystem.get(new Configuration());
>>                 FileStatus[] paths = fs.globStatus(new
>> Path("/data/path/to/file/d_20150330-1650"));
>>                 for (FileStatus path : paths) {
>>                     BufferedReader br = new BufferedReader(new
>> InputStreamReader(fs.open(path.getPath())));
>>                     String json_string = br.readLine();
>>                     while (json_string != null) {
>>                         JsonElement jelement = new
>> JsonParser().parse(json_string);
>>                         JsonObject jsonObject =
>> jelement.getAsJsonObject();
>>                         jsonObject =
>> jsonObject.getAsJsonObject("event_header");
>>                         jsonObject =
>> jsonObject.getAsJsonObject("event_id");
>>
>>                         if
>> (value.toString().equals(jsonObject.getAsString())) {
>>                             System.out.println(value.toString() +
>> "slkdjfksajflkjsfdkljsadfk;ljasklfjklasjfklsadl;sjdf");
>>                             context.write(value, one);
>>                         } else {
>>                             context.write(value, zero);
>>                         }
>>
>>                         json_string = br.readLine();
>>                     }
>>                 }
>>             } catch (IOException failed) {
>>             }
>>         }
>>     }
>>
>>
>> Reducer:
>>
>>     public class UUIDReducer extends Reducer<Text, Text, Text, Text>{
>>
>>         public void reduce(Text key, Text value, Context context) throws
>> IOException, InterruptedException{
>>             context.write(key, value);
>>         }
>>     }
>>
>>
>

Re: Simple MapReduce logic using Java API

Posted by Harshit Mathur <ma...@gmail.com>.
Why are you reading the files with buffered reader in map function.


The problem with your code might be because of the following reason,
The files in "/data/path/to/file/d_20150330-1650" will be locally stored
and will not be accessible to the mappers running on different nodes, and
as in your mapper code the IOException is eaten up, you are not getting the
proper stack trace.

I think you should use distributed cache to read this files in
"/data/path/to/file/d_20150330-1650" and then use the setup method to
access the data in these files.


BR,
Harshit Mathur

On Wed, Apr 1, 2015 at 11:57 AM, Ranadip Chatterjee <ra...@gmail.com>
wrote:

> Eating up the IOException in the mapper looks suspicious to me. That can
> silently consume the input without any output. Also check in the map sysout
> messages for the console print output.
>
> As an aside, since you are not doing anything in the reduce, try setting
> number of reduces to 0. That will force the job to be map only and make it
> simpler.
>
> Regards,
> Ranadip
> On 31 Mar 2015 19:23, "Shahab Yunus" <sh...@gmail.com> wrote:
>
>> What is the reason of using the queue?
>> "job.getConfiguration().set("mapred.job.queue.name", "exp_dsa");"
>>
>> Is your mapper or reducer even been called?
>>
>> Try adding the override annotation to the map/reduce methods as below:
>>
>> @Override
>>  public void map(Object key, Text value, Context context) throws
>> IOException, InterruptedException {
>>
>> Regards,
>> Shahab
>>
>> On Tue, Mar 31, 2015 at 3:26 AM, bradford li <br...@gmail.com>
>> wrote:
>>
>>> I'm not sure why my Mapper and Reducer have no output. The logic behind
>>> my code is, given a file of UUIDs (new line separated), I want to use
>>> `globStatus` to display all the paths to all potential files that the UUID
>>> might be in. Open and read the file. Each file contains 1-n lines of JSON.
>>> The UUID is in `event_header.event_id` in the JSON.
>>>
>>> Right now the MapReduce job runs without errors. However, something is
>>> wrong because I dont have any output. I'm not sure how to debug MapReduce
>>> jobs as well. If someone could provide me a source that would be awesome!
>>> The expected output from this program should be
>>>
>>>     UUID_1 1
>>>     UUID_2 1
>>>     UUID_3 1
>>>     UUID_4 1
>>>     ...
>>>     ...
>>>     UUID_n 1
>>>
>>> In my logic, the output file should be the UUIDs with a 1 next to them
>>> because upon found, 1 is written, if not found 0 is written. They should be
>>> all 1's because I pulled the UUIDs from the source.
>>>
>>> My Reducer currently does not do anything except I just wanted to see if
>>> I could get some simple logic working. There are most likely bugs in my
>>> code as I dont know have a easy way to debug MapReduce jobs
>>>
>>> Driver:
>>>
>>>     public class SearchUUID {
>>>
>>>         public static void main(String[] args) throws Exception {
>>>             Configuration conf = new Configuration();
>>>             Job job = Job.getInstance(conf, "UUID Search");
>>>             job.getConfiguration().set("mapred.job.queue.name",
>>> "exp_dsa");
>>>             job.setJarByClass(SearchUUID.class);
>>>             job.setMapperClass(UUIDMapper.class);
>>>             job.setReducerClass(UUIDReducer.class);
>>>             job.setOutputKeyClass(Text.class);
>>>             job.setOutputValueClass(Text.class);
>>>             FileInputFormat.addInputPath(job, new Path(args[0]));
>>>             FileOutputFormat.setOutputPath(job, new Path(args[1]));
>>>             System.exit(job.waitForCompletion(true) ? 0 : 1);
>>>         }
>>>     }
>>>
>>>
>>> UUIDMapper:
>>>
>>>     public class UUIDMapper extends Mapper<Object, Text, Text, Text> {
>>>         public void map(Object key, Text value, Context context) throws
>>> IOException, InterruptedException {
>>>
>>>             try {
>>>                 Text one = new Text("1");
>>>                 Text zero = new Text("0");
>>>
>>>                 FileSystem fs = FileSystem.get(new Configuration());
>>>                 FileStatus[] paths = fs.globStatus(new
>>> Path("/data/path/to/file/d_20150330-1650"));
>>>                 for (FileStatus path : paths) {
>>>                     BufferedReader br = new BufferedReader(new
>>> InputStreamReader(fs.open(path.getPath())));
>>>                     String json_string = br.readLine();
>>>                     while (json_string != null) {
>>>                         JsonElement jelement = new
>>> JsonParser().parse(json_string);
>>>                         JsonObject jsonObject =
>>> jelement.getAsJsonObject();
>>>                         jsonObject =
>>> jsonObject.getAsJsonObject("event_header");
>>>                         jsonObject =
>>> jsonObject.getAsJsonObject("event_id");
>>>
>>>                         if
>>> (value.toString().equals(jsonObject.getAsString())) {
>>>                             System.out.println(value.toString() +
>>> "slkdjfksajflkjsfdkljsadfk;ljasklfjklasjfklsadl;sjdf");
>>>                             context.write(value, one);
>>>                         } else {
>>>                             context.write(value, zero);
>>>                         }
>>>
>>>                         json_string = br.readLine();
>>>                     }
>>>                 }
>>>             } catch (IOException failed) {
>>>             }
>>>         }
>>>     }
>>>
>>>
>>> Reducer:
>>>
>>>     public class UUIDReducer extends Reducer<Text, Text, Text, Text>{
>>>
>>>         public void reduce(Text key, Text value, Context context) throws
>>> IOException, InterruptedException{
>>>             context.write(key, value);
>>>         }
>>>     }
>>>
>>>
>>


-- 
Harshit Mathur

Re: Simple MapReduce logic using Java API

Posted by Harshit Mathur <ma...@gmail.com>.
Why are you reading the files with buffered reader in map function.


The problem with your code might be because of the following reason,
The files in "/data/path/to/file/d_20150330-1650" will be locally stored
and will not be accessible to the mappers running on different nodes, and
as in your mapper code the IOException is eaten up, you are not getting the
proper stack trace.

I think you should use distributed cache to read this files in
"/data/path/to/file/d_20150330-1650" and then use the setup method to
access the data in these files.


BR,
Harshit Mathur

On Wed, Apr 1, 2015 at 11:57 AM, Ranadip Chatterjee <ra...@gmail.com>
wrote:

> Eating up the IOException in the mapper looks suspicious to me. That can
> silently consume the input without any output. Also check in the map sysout
> messages for the console print output.
>
> As an aside, since you are not doing anything in the reduce, try setting
> number of reduces to 0. That will force the job to be map only and make it
> simpler.
>
> Regards,
> Ranadip
> On 31 Mar 2015 19:23, "Shahab Yunus" <sh...@gmail.com> wrote:
>
>> What is the reason of using the queue?
>> "job.getConfiguration().set("mapred.job.queue.name", "exp_dsa");"
>>
>> Is your mapper or reducer even been called?
>>
>> Try adding the override annotation to the map/reduce methods as below:
>>
>> @Override
>>  public void map(Object key, Text value, Context context) throws
>> IOException, InterruptedException {
>>
>> Regards,
>> Shahab
>>
>> On Tue, Mar 31, 2015 at 3:26 AM, bradford li <br...@gmail.com>
>> wrote:
>>
>>> I'm not sure why my Mapper and Reducer have no output. The logic behind
>>> my code is, given a file of UUIDs (new line separated), I want to use
>>> `globStatus` to display all the paths to all potential files that the UUID
>>> might be in. Open and read the file. Each file contains 1-n lines of JSON.
>>> The UUID is in `event_header.event_id` in the JSON.
>>>
>>> Right now the MapReduce job runs without errors. However, something is
>>> wrong because I dont have any output. I'm not sure how to debug MapReduce
>>> jobs as well. If someone could provide me a source that would be awesome!
>>> The expected output from this program should be
>>>
>>>     UUID_1 1
>>>     UUID_2 1
>>>     UUID_3 1
>>>     UUID_4 1
>>>     ...
>>>     ...
>>>     UUID_n 1
>>>
>>> In my logic, the output file should be the UUIDs with a 1 next to them
>>> because upon found, 1 is written, if not found 0 is written. They should be
>>> all 1's because I pulled the UUIDs from the source.
>>>
>>> My Reducer currently does not do anything except I just wanted to see if
>>> I could get some simple logic working. There are most likely bugs in my
>>> code as I dont know have a easy way to debug MapReduce jobs
>>>
>>> Driver:
>>>
>>>     public class SearchUUID {
>>>
>>>         public static void main(String[] args) throws Exception {
>>>             Configuration conf = new Configuration();
>>>             Job job = Job.getInstance(conf, "UUID Search");
>>>             job.getConfiguration().set("mapred.job.queue.name",
>>> "exp_dsa");
>>>             job.setJarByClass(SearchUUID.class);
>>>             job.setMapperClass(UUIDMapper.class);
>>>             job.setReducerClass(UUIDReducer.class);
>>>             job.setOutputKeyClass(Text.class);
>>>             job.setOutputValueClass(Text.class);
>>>             FileInputFormat.addInputPath(job, new Path(args[0]));
>>>             FileOutputFormat.setOutputPath(job, new Path(args[1]));
>>>             System.exit(job.waitForCompletion(true) ? 0 : 1);
>>>         }
>>>     }
>>>
>>>
>>> UUIDMapper:
>>>
>>>     public class UUIDMapper extends Mapper<Object, Text, Text, Text> {
>>>         public void map(Object key, Text value, Context context) throws
>>> IOException, InterruptedException {
>>>
>>>             try {
>>>                 Text one = new Text("1");
>>>                 Text zero = new Text("0");
>>>
>>>                 FileSystem fs = FileSystem.get(new Configuration());
>>>                 FileStatus[] paths = fs.globStatus(new
>>> Path("/data/path/to/file/d_20150330-1650"));
>>>                 for (FileStatus path : paths) {
>>>                     BufferedReader br = new BufferedReader(new
>>> InputStreamReader(fs.open(path.getPath())));
>>>                     String json_string = br.readLine();
>>>                     while (json_string != null) {
>>>                         JsonElement jelement = new
>>> JsonParser().parse(json_string);
>>>                         JsonObject jsonObject =
>>> jelement.getAsJsonObject();
>>>                         jsonObject =
>>> jsonObject.getAsJsonObject("event_header");
>>>                         jsonObject =
>>> jsonObject.getAsJsonObject("event_id");
>>>
>>>                         if
>>> (value.toString().equals(jsonObject.getAsString())) {
>>>                             System.out.println(value.toString() +
>>> "slkdjfksajflkjsfdkljsadfk;ljasklfjklasjfklsadl;sjdf");
>>>                             context.write(value, one);
>>>                         } else {
>>>                             context.write(value, zero);
>>>                         }
>>>
>>>                         json_string = br.readLine();
>>>                     }
>>>                 }
>>>             } catch (IOException failed) {
>>>             }
>>>         }
>>>     }
>>>
>>>
>>> Reducer:
>>>
>>>     public class UUIDReducer extends Reducer<Text, Text, Text, Text>{
>>>
>>>         public void reduce(Text key, Text value, Context context) throws
>>> IOException, InterruptedException{
>>>             context.write(key, value);
>>>         }
>>>     }
>>>
>>>
>>


-- 
Harshit Mathur

Re: Simple MapReduce logic using Java API

Posted by Harshit Mathur <ma...@gmail.com>.
Why are you reading the files with buffered reader in map function.


The problem with your code might be because of the following reason,
The files in "/data/path/to/file/d_20150330-1650" will be locally stored
and will not be accessible to the mappers running on different nodes, and
as in your mapper code the IOException is eaten up, you are not getting the
proper stack trace.

I think you should use distributed cache to read this files in
"/data/path/to/file/d_20150330-1650" and then use the setup method to
access the data in these files.


BR,
Harshit Mathur

On Wed, Apr 1, 2015 at 11:57 AM, Ranadip Chatterjee <ra...@gmail.com>
wrote:

> Eating up the IOException in the mapper looks suspicious to me. That can
> silently consume the input without any output. Also check in the map sysout
> messages for the console print output.
>
> As an aside, since you are not doing anything in the reduce, try setting
> number of reduces to 0. That will force the job to be map only and make it
> simpler.
>
> Regards,
> Ranadip
> On 31 Mar 2015 19:23, "Shahab Yunus" <sh...@gmail.com> wrote:
>
>> What is the reason of using the queue?
>> "job.getConfiguration().set("mapred.job.queue.name", "exp_dsa");"
>>
>> Is your mapper or reducer even been called?
>>
>> Try adding the override annotation to the map/reduce methods as below:
>>
>> @Override
>>  public void map(Object key, Text value, Context context) throws
>> IOException, InterruptedException {
>>
>> Regards,
>> Shahab
>>
>> On Tue, Mar 31, 2015 at 3:26 AM, bradford li <br...@gmail.com>
>> wrote:
>>
>>> I'm not sure why my Mapper and Reducer have no output. The logic behind
>>> my code is, given a file of UUIDs (new line separated), I want to use
>>> `globStatus` to display all the paths to all potential files that the UUID
>>> might be in. Open and read the file. Each file contains 1-n lines of JSON.
>>> The UUID is in `event_header.event_id` in the JSON.
>>>
>>> Right now the MapReduce job runs without errors. However, something is
>>> wrong because I dont have any output. I'm not sure how to debug MapReduce
>>> jobs as well. If someone could provide me a source that would be awesome!
>>> The expected output from this program should be
>>>
>>>     UUID_1 1
>>>     UUID_2 1
>>>     UUID_3 1
>>>     UUID_4 1
>>>     ...
>>>     ...
>>>     UUID_n 1
>>>
>>> In my logic, the output file should be the UUIDs with a 1 next to them
>>> because upon found, 1 is written, if not found 0 is written. They should be
>>> all 1's because I pulled the UUIDs from the source.
>>>
>>> My Reducer currently does not do anything except I just wanted to see if
>>> I could get some simple logic working. There are most likely bugs in my
>>> code as I dont know have a easy way to debug MapReduce jobs
>>>
>>> Driver:
>>>
>>>     public class SearchUUID {
>>>
>>>         public static void main(String[] args) throws Exception {
>>>             Configuration conf = new Configuration();
>>>             Job job = Job.getInstance(conf, "UUID Search");
>>>             job.getConfiguration().set("mapred.job.queue.name",
>>> "exp_dsa");
>>>             job.setJarByClass(SearchUUID.class);
>>>             job.setMapperClass(UUIDMapper.class);
>>>             job.setReducerClass(UUIDReducer.class);
>>>             job.setOutputKeyClass(Text.class);
>>>             job.setOutputValueClass(Text.class);
>>>             FileInputFormat.addInputPath(job, new Path(args[0]));
>>>             FileOutputFormat.setOutputPath(job, new Path(args[1]));
>>>             System.exit(job.waitForCompletion(true) ? 0 : 1);
>>>         }
>>>     }
>>>
>>>
>>> UUIDMapper:
>>>
>>>     public class UUIDMapper extends Mapper<Object, Text, Text, Text> {
>>>         public void map(Object key, Text value, Context context) throws
>>> IOException, InterruptedException {
>>>
>>>             try {
>>>                 Text one = new Text("1");
>>>                 Text zero = new Text("0");
>>>
>>>                 FileSystem fs = FileSystem.get(new Configuration());
>>>                 FileStatus[] paths = fs.globStatus(new
>>> Path("/data/path/to/file/d_20150330-1650"));
>>>                 for (FileStatus path : paths) {
>>>                     BufferedReader br = new BufferedReader(new
>>> InputStreamReader(fs.open(path.getPath())));
>>>                     String json_string = br.readLine();
>>>                     while (json_string != null) {
>>>                         JsonElement jelement = new
>>> JsonParser().parse(json_string);
>>>                         JsonObject jsonObject =
>>> jelement.getAsJsonObject();
>>>                         jsonObject =
>>> jsonObject.getAsJsonObject("event_header");
>>>                         jsonObject =
>>> jsonObject.getAsJsonObject("event_id");
>>>
>>>                         if
>>> (value.toString().equals(jsonObject.getAsString())) {
>>>                             System.out.println(value.toString() +
>>> "slkdjfksajflkjsfdkljsadfk;ljasklfjklasjfklsadl;sjdf");
>>>                             context.write(value, one);
>>>                         } else {
>>>                             context.write(value, zero);
>>>                         }
>>>
>>>                         json_string = br.readLine();
>>>                     }
>>>                 }
>>>             } catch (IOException failed) {
>>>             }
>>>         }
>>>     }
>>>
>>>
>>> Reducer:
>>>
>>>     public class UUIDReducer extends Reducer<Text, Text, Text, Text>{
>>>
>>>         public void reduce(Text key, Text value, Context context) throws
>>> IOException, InterruptedException{
>>>             context.write(key, value);
>>>         }
>>>     }
>>>
>>>
>>


-- 
Harshit Mathur

Re: Simple MapReduce logic using Java API

Posted by Harshit Mathur <ma...@gmail.com>.
Why are you reading the files with buffered reader in map function.


The problem with your code might be because of the following reason,
The files in "/data/path/to/file/d_20150330-1650" will be locally stored
and will not be accessible to the mappers running on different nodes, and
as in your mapper code the IOException is eaten up, you are not getting the
proper stack trace.

I think you should use distributed cache to read this files in
"/data/path/to/file/d_20150330-1650" and then use the setup method to
access the data in these files.


BR,
Harshit Mathur

On Wed, Apr 1, 2015 at 11:57 AM, Ranadip Chatterjee <ra...@gmail.com>
wrote:

> Eating up the IOException in the mapper looks suspicious to me. That can
> silently consume the input without any output. Also check in the map sysout
> messages for the console print output.
>
> As an aside, since you are not doing anything in the reduce, try setting
> number of reduces to 0. That will force the job to be map only and make it
> simpler.
>
> Regards,
> Ranadip
> On 31 Mar 2015 19:23, "Shahab Yunus" <sh...@gmail.com> wrote:
>
>> What is the reason of using the queue?
>> "job.getConfiguration().set("mapred.job.queue.name", "exp_dsa");"
>>
>> Is your mapper or reducer even been called?
>>
>> Try adding the override annotation to the map/reduce methods as below:
>>
>> @Override
>>  public void map(Object key, Text value, Context context) throws
>> IOException, InterruptedException {
>>
>> Regards,
>> Shahab
>>
>> On Tue, Mar 31, 2015 at 3:26 AM, bradford li <br...@gmail.com>
>> wrote:
>>
>>> I'm not sure why my Mapper and Reducer have no output. The logic behind
>>> my code is, given a file of UUIDs (new line separated), I want to use
>>> `globStatus` to display all the paths to all potential files that the UUID
>>> might be in. Open and read the file. Each file contains 1-n lines of JSON.
>>> The UUID is in `event_header.event_id` in the JSON.
>>>
>>> Right now the MapReduce job runs without errors. However, something is
>>> wrong because I dont have any output. I'm not sure how to debug MapReduce
>>> jobs as well. If someone could provide me a source that would be awesome!
>>> The expected output from this program should be
>>>
>>>     UUID_1 1
>>>     UUID_2 1
>>>     UUID_3 1
>>>     UUID_4 1
>>>     ...
>>>     ...
>>>     UUID_n 1
>>>
>>> In my logic, the output file should be the UUIDs with a 1 next to them
>>> because upon found, 1 is written, if not found 0 is written. They should be
>>> all 1's because I pulled the UUIDs from the source.
>>>
>>> My Reducer currently does not do anything except I just wanted to see if
>>> I could get some simple logic working. There are most likely bugs in my
>>> code as I dont know have a easy way to debug MapReduce jobs
>>>
>>> Driver:
>>>
>>>     public class SearchUUID {
>>>
>>>         public static void main(String[] args) throws Exception {
>>>             Configuration conf = new Configuration();
>>>             Job job = Job.getInstance(conf, "UUID Search");
>>>             job.getConfiguration().set("mapred.job.queue.name",
>>> "exp_dsa");
>>>             job.setJarByClass(SearchUUID.class);
>>>             job.setMapperClass(UUIDMapper.class);
>>>             job.setReducerClass(UUIDReducer.class);
>>>             job.setOutputKeyClass(Text.class);
>>>             job.setOutputValueClass(Text.class);
>>>             FileInputFormat.addInputPath(job, new Path(args[0]));
>>>             FileOutputFormat.setOutputPath(job, new Path(args[1]));
>>>             System.exit(job.waitForCompletion(true) ? 0 : 1);
>>>         }
>>>     }
>>>
>>>
>>> UUIDMapper:
>>>
>>>     public class UUIDMapper extends Mapper<Object, Text, Text, Text> {
>>>         public void map(Object key, Text value, Context context) throws
>>> IOException, InterruptedException {
>>>
>>>             try {
>>>                 Text one = new Text("1");
>>>                 Text zero = new Text("0");
>>>
>>>                 FileSystem fs = FileSystem.get(new Configuration());
>>>                 FileStatus[] paths = fs.globStatus(new
>>> Path("/data/path/to/file/d_20150330-1650"));
>>>                 for (FileStatus path : paths) {
>>>                     BufferedReader br = new BufferedReader(new
>>> InputStreamReader(fs.open(path.getPath())));
>>>                     String json_string = br.readLine();
>>>                     while (json_string != null) {
>>>                         JsonElement jelement = new
>>> JsonParser().parse(json_string);
>>>                         JsonObject jsonObject =
>>> jelement.getAsJsonObject();
>>>                         jsonObject =
>>> jsonObject.getAsJsonObject("event_header");
>>>                         jsonObject =
>>> jsonObject.getAsJsonObject("event_id");
>>>
>>>                         if
>>> (value.toString().equals(jsonObject.getAsString())) {
>>>                             System.out.println(value.toString() +
>>> "slkdjfksajflkjsfdkljsadfk;ljasklfjklasjfklsadl;sjdf");
>>>                             context.write(value, one);
>>>                         } else {
>>>                             context.write(value, zero);
>>>                         }
>>>
>>>                         json_string = br.readLine();
>>>                     }
>>>                 }
>>>             } catch (IOException failed) {
>>>             }
>>>         }
>>>     }
>>>
>>>
>>> Reducer:
>>>
>>>     public class UUIDReducer extends Reducer<Text, Text, Text, Text>{
>>>
>>>         public void reduce(Text key, Text value, Context context) throws
>>> IOException, InterruptedException{
>>>             context.write(key, value);
>>>         }
>>>     }
>>>
>>>
>>


-- 
Harshit Mathur