You are viewing a plain text version of this content. The canonical link for it is here.
Posted to mapreduce-user@hadoop.apache.org by Shanu Sushmita <sh...@gmail.com> on 2012/07/20 18:13:26 UTC

Comparing input hdfs file to a distributed cache files

Hi,

I am trying to solve a problem where I need to computed frequencies of  
words occurring in a file1 from file 2.

For example:
text in file1:
hadoop
user
hello
world

and text in file2 is:
hadoop
user
hello
world
hadoop
hadoop
hadoop
user
world
world
world
hadoop
user
hello

so the output should be:
hadoop 5
user 3
hello 2
world 4

I read that distributed caching is a good way to do such jobs.Size of  
my files are:
File1 = 17GB
File2 = 3 MB

And here is my code:




import org.apache.hadoop.fs.*;
import org.apache.hadoop.conf.*;
import org.apache.hadoop.io.*;
import org.apache.hadoop.mapred.*;
import org.apache.hadoop.util.*;
import java.util.*;
import java.io.*;
import org.apache.hadoop.filecache.*;
import java.net.*;

public class RepeatFreqTest
{


public static class Map extends MapReduceBase implements  
Mapper<LongWritable, Text, Text, IntWritable> {
             private final static IntWritable one = new IntWritable(1);
             private Text word = new Text();
              private HashSet<String> dyads = new HashSet<String>();


   public void configure(JobConf conf)
   {


  Path[] cacheFiles = new Path[0];
    try {


   cacheFiles = DistributedCache.getLocalCacheFiles(conf);
   } // end of try

   catch (IOException ioe) {
             System.err.println("Caught exception while getting cached  
files: " + StringUtils.stringifyException(ioe));
    }// end of catch

   for (Path dyadPath : cacheFiles)
   {

         loadDyads(dyadPath);
   } // end of for cachePath


   } // end of configure

private void loadDyads(Path dyadPath)
{
try
{
                 BufferedReader wordReader = new BufferedReader(new  
FileReader(dyadPath.toString()));
                 String line = null;
                 while ((line = wordReader.readLine()) != null) {
        dyads.add(line);

       } // end of while

       wordReader.close();

}// end of try
  catch (IOException ioe) {
       System.err.println("IOException reading from distributed cache");
       } // end of catch

}// end of loadDyads()



   /* actual map() method, etc go here */


                         @Override
              public void map(LongWritable key, Text value,  
OutputCollector<Text, IntWritable> output, Reporter reporter) throws  
IOException
              {


                         // dyad, ut and year from all dyads (big  
file) file!!!


                         String line = value.toString();
                         String[] tokens = line.split("\\|");
                         String ut1 = tokens[0].trim();
                         String dyad1 = tokens[1].trim();
                         String year1 = tokens[2].trim();
                         int y1 = Integer.parseInt(year1);


                         // dyad, ut and year from sample dyads file  
(sample file stored in the memory)!!!


                         Iterator it = dyads.iterator();
                         while(it.hasNext())
                         {
                                 //Text word = new Text();
                                 String setline = it.next().toString();

                                 String[] tokens2 = setline.split("\ 
\|");
                                 String ut2 = tokens2[0].trim();
                                 String dyad2 = tokens2[1].trim();
                                 String year2 = tokens2[2].trim();
                                 int y2 = Integer.parseInt(year2);




                                 if(dyad1.equalsIgnoreCase(dyad2))
                                 {
                                         if(! 
(ut1.equalsIgnoreCase(ut2)))
                                         {
                                                 if(y1<=y2)
                                                 {

                                                          
word.set(setline);
                                                          
output.collect(word, one);
                                                 }

                                         } // end of if ut1!=ut2

                                 } //


                         }// end of while


} // end of override map
} // end of big Map class


public static class Reduce extends MapReduceBase implements  
Reducer<Text, IntWritable, Text, IntWritable> {

  @Override
              public void reduce(Text key, Iterator<IntWritable>  
values, OutputCollector<Text, IntWritable> output, Reporter reporter)  
throws IOException {
                int sum = 0;
                while (values.hasNext()) {
                  sum += values.next().get();

                } // end of while
                output.collect(key, new IntWritable(sum));
              } // end of override reduce
            } // end of Big Reduce


public static void main(String[] args) throws Exception {

JobConf conf = new JobConf(RepeatFreqTest.class);
conf.setJobName("Repeat");

conf.setOutputKeyClass(Text.class);
conf.setOutputValueClass(IntWritable.class);

conf.setMapperClass(Map.class);
conf.setCombinerClass(Reduce.class);
conf.setReducerClass(Reduce.class);

conf.setInputFormat(TextInputFormat.class);
conf.setOutputFormat(TextOutputFormat.class);

FileInputFormat.setInputPaths(conf, new Path(args[0]));
FileOutputFormat.setOutputPath(conf, new Path(args[1]));



  DistributedCache.addCacheFile(new Path("/user/ss/cacheFiles/ 
File1.txt").toUri(), conf);

  JobClient.runJob(conf);

}// end of main

} // end of class


And I put my File1.txt and File2.txt in hdfs as follows:

$HADOOP_HOME/bin/hadoop fs -mkdir input
$HADOOP_HOME/bin/hadoop fs -mkdir cacheFiles
$HADOOP_HOME/bin/hadoop fs -put /u/home/File2.txt input
$HADOOP_HOME/bin/hadoop fs -put /u/home/File1.txt cacheFiles

My problem is that my code compiles fine, but it would just not  
proceed from map 0% reduce 0% stage.

What I am doing wrong?

Any suggestion would be of great help.

Best,
SS

Re: Comparing input hdfs file to a distributed cache files

Posted by Shanu Sushmita <sh...@gmail.com>.
Thanks Sriram,

Many thanks for your prompt reply. I appreciate you inputs.

I am trying to compare two files file1 (3MB) and file2 which is (17G). I
need to read each line from file1 and look for it occurrences in file2 and
get the frequencies.
I am new to hadoop, so am not sure if my approach is correct. Because of
the size of the files, I couldn't process it faster suing Java, so I
decided to try it using Hadoop. And from what I read and learnt from
searching over internet and tutorials, if two files needs to be compared
then storing the smaller one in distributed cache is one approach to go
about it.

For instance:
http://developer.yahoo.com/hadoop/tutorial/module5.html#auxdata

Am I wrong?
SS


On Fri, Jul 20, 2012 at 9:18 AM, Sriram Ramachandrasekaran <
sri.rams85@gmail.com> wrote:

> Hello,
> If I understand right, you are trying to run your map reduce on files that
> you shared via Distributed cache. Distributed Cache is not generally meant
> for it. It is available in case your MR needs other reference files, some
> archives, native libs, etc that needs to be shared across your cluster. The
> suggestion would be to load the file that you want to run your word
> frequencies on, to a HDFS file path and then read the job off it. You will
> need to configure your MR job to pick input files from your new HDFS
> location.
>
> This would be my approach. Other regulars in the forum will be better able
> to help you!
>
> -Sriram
>
>
> On Fri, Jul 20, 2012 at 9:43 PM, Shanu Sushmita <
> shanu.sushmita16@gmail.com> wrote:
>
>> Hi,
>>
>> I am trying to solve a problem where I need to computed frequencies of
>> words occurring in a file1 from file 2.
>>
>> For example:
>> text in file1:
>> hadoop
>> user
>> hello
>> world
>>
>> and text in file2 is:
>> hadoop
>> user
>> hello
>> world
>> hadoop
>> hadoop
>> hadoop
>> user
>> world
>> world
>> world
>> hadoop
>> user
>> hello
>>
>> so the output should be:
>> hadoop 5
>> user 3
>> hello 2
>> world 4
>>
>> I read that distributed caching is a good way to do such jobs.Size of my
>> files are:
>> File1 = 17GB
>> File2 = 3 MB
>>
>> And here is my code:
>>
>>
>>
>>
>> import org.apache.hadoop.fs.*;
>> import org.apache.hadoop.conf.*;
>> import org.apache.hadoop.io.*;
>> import org.apache.hadoop.mapred.*;
>> import org.apache.hadoop.util.*;
>> import java.util.*;
>> import java.io.*;
>> import org.apache.hadoop.filecache.*;
>> import java.net.*;
>>
>> public class RepeatFreqTest
>> {
>>
>>
>> public static class Map extends MapReduceBase implements
>> Mapper<LongWritable, Text, Text, IntWritable> {
>>             private final static IntWritable one = new IntWritable(1);
>>             private Text word = new Text();
>>              private HashSet<String> dyads = new HashSet<String>();
>>
>>
>>   public void configure(JobConf conf)
>>   {
>>
>>
>>  Path[] cacheFiles = new Path[0];
>>    try {
>>
>>
>>   cacheFiles = DistributedCache.**getLocalCacheFiles(conf);
>>   } // end of try
>>
>>   catch (IOException ioe) {
>>             System.err.println("Caught exception while getting cached
>> files: " + StringUtils.**stringifyException(ioe));
>>    }// end of catch
>>
>>   for (Path dyadPath : cacheFiles)
>>   {
>>
>>         loadDyads(dyadPath);
>>   } // end of for cachePath
>>
>>
>>   } // end of configure
>>
>> private void loadDyads(Path dyadPath)
>> {
>> try
>> {
>>                 BufferedReader wordReader = new BufferedReader(new
>> FileReader(dyadPath.toString()**));
>>                 String line = null;
>>                 while ((line = wordReader.readLine()) != null) {
>>        dyads.add(line);
>>
>>       } // end of while
>>
>>       wordReader.close();
>>
>> }// end of try
>>  catch (IOException ioe) {
>>       System.err.println("**IOException reading from distributed cache");
>>       } // end of catch
>>
>> }// end of loadDyads()
>>
>>
>>
>>   /* actual map() method, etc go here */
>>
>>
>>                         @Override
>>              public void map(LongWritable key, Text value,
>> OutputCollector<Text, IntWritable> output, Reporter reporter) throws
>> IOException
>>              {
>>
>>
>>                         // dyad, ut and year from all dyads (big file)
>> file!!!
>>
>>
>>                         String line = value.toString();
>>                         String[] tokens = line.split("\\|");
>>                         String ut1 = tokens[0].trim();
>>                         String dyad1 = tokens[1].trim();
>>                         String year1 = tokens[2].trim();
>>                         int y1 = Integer.parseInt(year1);
>>
>>
>>                         // dyad, ut and year from sample dyads file
>> (sample file stored in the memory)!!!
>>
>>
>>                         Iterator it = dyads.iterator();
>>                         while(it.hasNext())
>>                         {
>>                                 //Text word = new Text();
>>                                 String setline = it.next().toString();
>>
>>                                 String[] tokens2 = setline.split("\\|");
>>                                 String ut2 = tokens2[0].trim();
>>                                 String dyad2 = tokens2[1].trim();
>>                                 String year2 = tokens2[2].trim();
>>                                 int y2 = Integer.parseInt(year2);
>>
>>
>>
>>
>>                                 if(dyad1.equalsIgnoreCase(**dyad2))
>>                                 {
>>                                         if(!(ut1.equalsIgnoreCase(ut2)**
>> ))
>>                                         {
>>                                                 if(y1<=y2)
>>                                                 {
>>
>>                                                         word.set(setline);
>>
>> output.collect(word, one);
>>                                                 }
>>
>>                                         } // end of if ut1!=ut2
>>
>>                                 } //
>>
>>
>>                         }// end of while
>>
>>
>> } // end of override map
>> } // end of big Map class
>>
>>
>> public static class Reduce extends MapReduceBase implements Reducer<Text,
>> IntWritable, Text, IntWritable> {
>>
>>  @Override
>>              public void reduce(Text key, Iterator<IntWritable> values,
>> OutputCollector<Text, IntWritable> output, Reporter reporter) throws
>> IOException {
>>                int sum = 0;
>>                while (values.hasNext()) {
>>                  sum += values.next().get();
>>
>>                } // end of while
>>                output.collect(key, new IntWritable(sum));
>>              } // end of override reduce
>>            } // end of Big Reduce
>>
>>
>> public static void main(String[] args) throws Exception {
>>
>> JobConf conf = new JobConf(RepeatFreqTest.class);
>> conf.setJobName("Repeat");
>>
>> conf.setOutputKeyClass(Text.**class);
>> conf.setOutputValueClass(**IntWritable.class);
>>
>> conf.setMapperClass(Map.class)**;
>> conf.setCombinerClass(Reduce.**class);
>> conf.setReducerClass(Reduce.**class);
>>
>> conf.setInputFormat(**TextInputFormat.class);
>> conf.setOutputFormat(**TextOutputFormat.class);
>>
>> FileInputFormat.setInputPaths(**conf, new Path(args[0]));
>> FileOutputFormat.**setOutputPath(conf, new Path(args[1]));
>>
>>
>>
>>  DistributedCache.addCacheFile(**new Path("/user/ss/cacheFiles/**File1.txt").toUri(),
>> conf);
>>
>>  JobClient.runJob(conf);
>>
>> }// end of main
>>
>> } // end of class
>>
>>
>> And I put my File1.txt and File2.txt in hdfs as follows:
>>
>> $HADOOP_HOME/bin/hadoop fs -mkdir input
>> $HADOOP_HOME/bin/hadoop fs -mkdir cacheFiles
>> $HADOOP_HOME/bin/hadoop fs -put /u/home/File2.txt input
>> $HADOOP_HOME/bin/hadoop fs -put /u/home/File1.txt cacheFiles
>>
>> My problem is that my code compiles fine, but it would just not proceed
>> from map 0% reduce 0% stage.
>>
>> What I am doing wrong?
>>
>> Any suggestion would be of great help.
>>
>> Best,
>> SS
>>
>
>
>
> --
> It's just about how deep your longing is!
>
>

Re: Comparing input hdfs file to a distributed cache files

Posted by Sriram Ramachandrasekaran <sr...@gmail.com>.
Hello,
If I understand right, you are trying to run your map reduce on files that
you shared via Distributed cache. Distributed Cache is not generally meant
for it. It is available in case your MR needs other reference files, some
archives, native libs, etc that needs to be shared across your cluster. The
suggestion would be to load the file that you want to run your word
frequencies on, to a HDFS file path and then read the job off it. You will
need to configure your MR job to pick input files from your new HDFS
location.

This would be my approach. Other regulars in the forum will be better able
to help you!

-Sriram


On Fri, Jul 20, 2012 at 9:43 PM, Shanu Sushmita
<sh...@gmail.com>wrote:

> Hi,
>
> I am trying to solve a problem where I need to computed frequencies of
> words occurring in a file1 from file 2.
>
> For example:
> text in file1:
> hadoop
> user
> hello
> world
>
> and text in file2 is:
> hadoop
> user
> hello
> world
> hadoop
> hadoop
> hadoop
> user
> world
> world
> world
> hadoop
> user
> hello
>
> so the output should be:
> hadoop 5
> user 3
> hello 2
> world 4
>
> I read that distributed caching is a good way to do such jobs.Size of my
> files are:
> File1 = 17GB
> File2 = 3 MB
>
> And here is my code:
>
>
>
>
> import org.apache.hadoop.fs.*;
> import org.apache.hadoop.conf.*;
> import org.apache.hadoop.io.*;
> import org.apache.hadoop.mapred.*;
> import org.apache.hadoop.util.*;
> import java.util.*;
> import java.io.*;
> import org.apache.hadoop.filecache.*;
> import java.net.*;
>
> public class RepeatFreqTest
> {
>
>
> public static class Map extends MapReduceBase implements
> Mapper<LongWritable, Text, Text, IntWritable> {
>             private final static IntWritable one = new IntWritable(1);
>             private Text word = new Text();
>              private HashSet<String> dyads = new HashSet<String>();
>
>
>   public void configure(JobConf conf)
>   {
>
>
>  Path[] cacheFiles = new Path[0];
>    try {
>
>
>   cacheFiles = DistributedCache.**getLocalCacheFiles(conf);
>   } // end of try
>
>   catch (IOException ioe) {
>             System.err.println("Caught exception while getting cached
> files: " + StringUtils.**stringifyException(ioe));
>    }// end of catch
>
>   for (Path dyadPath : cacheFiles)
>   {
>
>         loadDyads(dyadPath);
>   } // end of for cachePath
>
>
>   } // end of configure
>
> private void loadDyads(Path dyadPath)
> {
> try
> {
>                 BufferedReader wordReader = new BufferedReader(new
> FileReader(dyadPath.toString()**));
>                 String line = null;
>                 while ((line = wordReader.readLine()) != null) {
>        dyads.add(line);
>
>       } // end of while
>
>       wordReader.close();
>
> }// end of try
>  catch (IOException ioe) {
>       System.err.println("**IOException reading from distributed cache");
>       } // end of catch
>
> }// end of loadDyads()
>
>
>
>   /* actual map() method, etc go here */
>
>
>                         @Override
>              public void map(LongWritable key, Text value,
> OutputCollector<Text, IntWritable> output, Reporter reporter) throws
> IOException
>              {
>
>
>                         // dyad, ut and year from all dyads (big file)
> file!!!
>
>
>                         String line = value.toString();
>                         String[] tokens = line.split("\\|");
>                         String ut1 = tokens[0].trim();
>                         String dyad1 = tokens[1].trim();
>                         String year1 = tokens[2].trim();
>                         int y1 = Integer.parseInt(year1);
>
>
>                         // dyad, ut and year from sample dyads file
> (sample file stored in the memory)!!!
>
>
>                         Iterator it = dyads.iterator();
>                         while(it.hasNext())
>                         {
>                                 //Text word = new Text();
>                                 String setline = it.next().toString();
>
>                                 String[] tokens2 = setline.split("\\|");
>                                 String ut2 = tokens2[0].trim();
>                                 String dyad2 = tokens2[1].trim();
>                                 String year2 = tokens2[2].trim();
>                                 int y2 = Integer.parseInt(year2);
>
>
>
>
>                                 if(dyad1.equalsIgnoreCase(**dyad2))
>                                 {
>                                         if(!(ut1.equalsIgnoreCase(ut2)**))
>                                         {
>                                                 if(y1<=y2)
>                                                 {
>
>                                                         word.set(setline);
>
> output.collect(word, one);
>                                                 }
>
>                                         } // end of if ut1!=ut2
>
>                                 } //
>
>
>                         }// end of while
>
>
> } // end of override map
> } // end of big Map class
>
>
> public static class Reduce extends MapReduceBase implements Reducer<Text,
> IntWritable, Text, IntWritable> {
>
>  @Override
>              public void reduce(Text key, Iterator<IntWritable> values,
> OutputCollector<Text, IntWritable> output, Reporter reporter) throws
> IOException {
>                int sum = 0;
>                while (values.hasNext()) {
>                  sum += values.next().get();
>
>                } // end of while
>                output.collect(key, new IntWritable(sum));
>              } // end of override reduce
>            } // end of Big Reduce
>
>
> public static void main(String[] args) throws Exception {
>
> JobConf conf = new JobConf(RepeatFreqTest.class);
> conf.setJobName("Repeat");
>
> conf.setOutputKeyClass(Text.**class);
> conf.setOutputValueClass(**IntWritable.class);
>
> conf.setMapperClass(Map.class)**;
> conf.setCombinerClass(Reduce.**class);
> conf.setReducerClass(Reduce.**class);
>
> conf.setInputFormat(**TextInputFormat.class);
> conf.setOutputFormat(**TextOutputFormat.class);
>
> FileInputFormat.setInputPaths(**conf, new Path(args[0]));
> FileOutputFormat.**setOutputPath(conf, new Path(args[1]));
>
>
>
>  DistributedCache.addCacheFile(**new Path("/user/ss/cacheFiles/**File1.txt").toUri(),
> conf);
>
>  JobClient.runJob(conf);
>
> }// end of main
>
> } // end of class
>
>
> And I put my File1.txt and File2.txt in hdfs as follows:
>
> $HADOOP_HOME/bin/hadoop fs -mkdir input
> $HADOOP_HOME/bin/hadoop fs -mkdir cacheFiles
> $HADOOP_HOME/bin/hadoop fs -put /u/home/File2.txt input
> $HADOOP_HOME/bin/hadoop fs -put /u/home/File1.txt cacheFiles
>
> My problem is that my code compiles fine, but it would just not proceed
> from map 0% reduce 0% stage.
>
> What I am doing wrong?
>
> Any suggestion would be of great help.
>
> Best,
> SS
>



-- 
It's just about how deep your longing is!

Re: Comparing input hdfs file to a distributed cache files

Posted by Shanu Shushmita <sh...@gmail.com>.
Hi,

I tried looking into the logs, and there is no error message which could have indicated the cause of my map-reduce being stuck at 0%.

But, I wonder if number of nodes being used affects the speed of a map-reduce job?

Earlier I was running it by requesting 2 nodes with 8 processors, now I tried running it on 4 nodes and it works slightly  faster. That is, from 0% map ( for over 6 hours and after too) to 1 % map per hour.

Can this be a reason?

Best,
SS

On Jul 20, 2012, at 9:24 AM, Harsh J wrote:

> SS,
> 
> Is your job not progressing at all (i.e. continues to be at 0-progress
> for hours), or does it fail after zero progress?
> 
> I'd try adding logging to various important points of the map/reduce
> functions and see whats taking it so long, or if its getting stuck at
> something. Logs are observable for each task attempt via the JT Web
> UI/etc..
> 
> On Fri, Jul 20, 2012 at 9:43 PM, Shanu Sushmita
> <sh...@gmail.com> wrote:
>> Hi,
>> 
>> I am trying to solve a problem where I need to computed frequencies of words
>> occurring in a file1 from file 2.
>> 
>> For example:
>> text in file1:
>> hadoop
>> user
>> hello
>> world
>> 
>> and text in file2 is:
>> hadoop
>> user
>> hello
>> world
>> hadoop
>> hadoop
>> hadoop
>> user
>> world
>> world
>> world
>> hadoop
>> user
>> hello
>> 
>> so the output should be:
>> hadoop 5
>> user 3
>> hello 2
>> world 4
>> 
>> I read that distributed caching is a good way to do such jobs.Size of my
>> files are:
>> File1 = 17GB
>> File2 = 3 MB
>> 
>> And here is my code:
>> 
>> 
>> 
>> 
>> import org.apache.hadoop.fs.*;
>> import org.apache.hadoop.conf.*;
>> import org.apache.hadoop.io.*;
>> import org.apache.hadoop.mapred.*;
>> import org.apache.hadoop.util.*;
>> import java.util.*;
>> import java.io.*;
>> import org.apache.hadoop.filecache.*;
>> import java.net.*;
>> 
>> public class RepeatFreqTest
>> {
>> 
>> 
>> public static class Map extends MapReduceBase implements
>> Mapper<LongWritable, Text, Text, IntWritable> {
>>            private final static IntWritable one = new IntWritable(1);
>>            private Text word = new Text();
>>             private HashSet<String> dyads = new HashSet<String>();
>> 
>> 
>>  public void configure(JobConf conf)
>>  {
>> 
>> 
>> Path[] cacheFiles = new Path[0];
>>   try {
>> 
>> 
>>  cacheFiles = DistributedCache.getLocalCacheFiles(conf);
>>  } // end of try
>> 
>>  catch (IOException ioe) {
>>            System.err.println("Caught exception while getting cached files:
>> " + StringUtils.stringifyException(ioe));
>>   }// end of catch
>> 
>>  for (Path dyadPath : cacheFiles)
>>  {
>> 
>>        loadDyads(dyadPath);
>>  } // end of for cachePath
>> 
>> 
>>  } // end of configure
>> 
>> private void loadDyads(Path dyadPath)
>> {
>> try
>> {
>>                BufferedReader wordReader = new BufferedReader(new
>> FileReader(dyadPath.toString()));
>>                String line = null;
>>                while ((line = wordReader.readLine()) != null) {
>>       dyads.add(line);
>> 
>>      } // end of while
>> 
>>      wordReader.close();
>> 
>> }// end of try
>> catch (IOException ioe) {
>>      System.err.println("IOException reading from distributed cache");
>>      } // end of catch
>> 
>> }// end of loadDyads()
>> 
>> 
>> 
>>  /* actual map() method, etc go here */
>> 
>> 
>>                        @Override
>>             public void map(LongWritable key, Text value,
>> OutputCollector<Text, IntWritable> output, Reporter reporter) throws
>> IOException
>>             {
>> 
>> 
>>                        // dyad, ut and year from all dyads (big file)
>> file!!!
>> 
>> 
>>                        String line = value.toString();
>>                        String[] tokens = line.split("\\|");
>>                        String ut1 = tokens[0].trim();
>>                        String dyad1 = tokens[1].trim();
>>                        String year1 = tokens[2].trim();
>>                        int y1 = Integer.parseInt(year1);
>> 
>> 
>>                        // dyad, ut and year from sample dyads file (sample
>> file stored in the memory)!!!
>> 
>> 
>>                        Iterator it = dyads.iterator();
>>                        while(it.hasNext())
>>                        {
>>                                //Text word = new Text();
>>                                String setline = it.next().toString();
>> 
>>                                String[] tokens2 = setline.split("\\|");
>>                                String ut2 = tokens2[0].trim();
>>                                String dyad2 = tokens2[1].trim();
>>                                String year2 = tokens2[2].trim();
>>                                int y2 = Integer.parseInt(year2);
>> 
>> 
>> 
>> 
>>                                if(dyad1.equalsIgnoreCase(dyad2))
>>                                {
>>                                        if(!(ut1.equalsIgnoreCase(ut2)))
>>                                        {
>>                                                if(y1<=y2)
>>                                                {
>> 
>>                                                        word.set(setline);
>>                                                        output.collect(word,
>> one);
>>                                                }
>> 
>>                                        } // end of if ut1!=ut2
>> 
>>                                } //
>> 
>> 
>>                        }// end of while
>> 
>> 
>> } // end of override map
>> } // end of big Map class
>> 
>> 
>> public static class Reduce extends MapReduceBase implements Reducer<Text,
>> IntWritable, Text, IntWritable> {
>> 
>> @Override
>>             public void reduce(Text key, Iterator<IntWritable> values,
>> OutputCollector<Text, IntWritable> output, Reporter reporter) throws
>> IOException {
>>               int sum = 0;
>>               while (values.hasNext()) {
>>                 sum += values.next().get();
>> 
>>               } // end of while
>>               output.collect(key, new IntWritable(sum));
>>             } // end of override reduce
>>           } // end of Big Reduce
>> 
>> 
>> public static void main(String[] args) throws Exception {
>> 
>> JobConf conf = new JobConf(RepeatFreqTest.class);
>> conf.setJobName("Repeat");
>> 
>> conf.setOutputKeyClass(Text.class);
>> conf.setOutputValueClass(IntWritable.class);
>> 
>> conf.setMapperClass(Map.class);
>> conf.setCombinerClass(Reduce.class);
>> conf.setReducerClass(Reduce.class);
>> 
>> conf.setInputFormat(TextInputFormat.class);
>> conf.setOutputFormat(TextOutputFormat.class);
>> 
>> FileInputFormat.setInputPaths(conf, new Path(args[0]));
>> FileOutputFormat.setOutputPath(conf, new Path(args[1]));
>> 
>> 
>> 
>> DistributedCache.addCacheFile(new
>> Path("/user/ss/cacheFiles/File1.txt").toUri(), conf);
>> 
>> JobClient.runJob(conf);
>> 
>> }// end of main
>> 
>> } // end of class
>> 
>> 
>> And I put my File1.txt and File2.txt in hdfs as follows:
>> 
>> $HADOOP_HOME/bin/hadoop fs -mkdir input
>> $HADOOP_HOME/bin/hadoop fs -mkdir cacheFiles
>> $HADOOP_HOME/bin/hadoop fs -put /u/home/File2.txt input
>> $HADOOP_HOME/bin/hadoop fs -put /u/home/File1.txt cacheFiles
>> 
>> My problem is that my code compiles fine, but it would just not proceed from
>> map 0% reduce 0% stage.
>> 
>> What I am doing wrong?
>> 
>> Any suggestion would be of great help.
>> 
>> Best,
>> SS
> 
> 
> 
> -- 
> Harsh J


Re: Comparing input hdfs file to a distributed cache files

Posted by Shanu Sushmita <sh...@gmail.com>.
Is there any particular thing that I should look at in the logs?
here I few things I saw in the beginning of the log file:


2012-07-20 11:00:03,483 INFO org.apache.hadoop.metrics2.impl.MetricsConfig:
loaded properties from hadoop-metrics2.properties
2012-07-20 11:00:03,522 INFO
org.apache.hadoop.metrics2.impl.MetricsSourceAdapter: MBean for source
MetricsSystem,sub=Stats registered.
2012-07-20 11:00:03,523 INFO
org.apache.hadoop.metrics2.impl.MetricsSystemImpl: Scheduled snapshot
period at 10 second(s).
2012-07-20 11:00:03,523 INFO
org.apache.hadoop.metrics2.impl.MetricsSystemImpl: TaskTracker metrics
system started
2012-07-20 11:00:03,711 INFO
org.apache.hadoop.metrics2.impl.MetricsSourceAdapter: MBean for source ugi
registered.
2012-07-20 11:00:03,858 INFO org.mortbay.log: Logging to
org.slf4j.impl.Log4jLoggerAdapter(org.mortbay.log) via
org.mortbay.log.Slf4jLog
2012-07-20 11:00:03,902 INFO org.apache.hadoop.http.HttpServer: Added
global filtersafety
(class=org.apache.hadoop.http.HttpServer$QuotingInputFilter)
2012-07-20 11:00:03,921 INFO org.apache.hadoop.mapred.TaskLogsTruncater:
Initializing logs' truncater with mapRetainSize=-1 and reduceRetainSize=-1
2012-07-20 11:00:03,924 INFO org.apache.hadoop.mapred.TaskTracker: Starting
tasktracker with owner as ssushmit
2012-07-20 11:00:03,926 INFO org.apache.hadoop.mapred.TaskTracker: Good
mapred local directories are:
/work/3719788.1.hadoop.q/hadoop-ssushmit/mapred/local
2012-07-20 11:00:03,929 INFO org.apache.hadoop.util.NativeCodeLoader:
Loaded the native-hadoop library
2012-07-20 11:00:03,936 INFO
org.apache.hadoop.metrics2.impl.MetricsSourceAdapter: MBean for source jvm
registered.
2012-07-20 11:00:03,937 INFO
org.apache.hadoop.metrics2.impl.MetricsSourceAdapter: MBean for source
TaskTrackerMetrics registered.
2012-07-20 11:00:03,956 INFO org.apache.hadoop.ipc.Server: Starting
SocketReader
2012-07-20 11:00:03,958 INFO
org.apache.hadoop.metrics2.impl.MetricsSourceAdapter: MBean for source
RpcDetailedActivityForPort56578 registered.
2012-07-20 11:00:03,958 INFO
org.apache.hadoop.metrics2.impl.MetricsSourceAdapter: MBean for source
RpcActivityForPort56578 registered.
2012-07-20 11:00:03,961 INFO org.apache.hadoop.ipc.Server: IPC Server
Responder: starting
2012-07-20 11:00:03,962 INFO org.apache.hadoop.ipc.Server: IPC Server
listener on 56578: starting
2012-07-20 11:00:03,962 INFO org.apache.hadoop.ipc.Server: IPC Server
handler 0 on 56578: starting
2012-07-20 11:00:03,963 INFO org.apache.hadoop.ipc.Server: IPC Server
handler 1 on 56578: starting
2012-07-20 11:00:03,964 INFO org.apache.hadoop.ipc.Server: IPC Server
handler 2 on 56578: starting
2012-07-20 11:00:03,964 INFO org.apache.hadoop.ipc.Server: IPC Server
handler 3 on 56578: starting
2012-07-20 11:00:03,965 INFO org.apache.hadoop.ipc.Server: IPC Server
handler 4 on 56578: starting
2012-07-20 11:00:03,965 INFO org.apache.hadoop.ipc.Server: IPC Server
handler 5 on 56578: starting
2012-07-20 11:00:03,966 INFO org.apache.hadoop.ipc.Server: IPC Server
handler 6 on 56578: starting
2012-07-20 11:00:03,967 INFO org.apache.hadoop.ipc.Server: IPC Server
handler 7 on 56578: starting
2012-07-20 11:00:03,967 INFO org.apache.hadoop.ipc.Server: IPC Server
handler 8 on 56578: starting
2012-07-20 11:00:03,971 INFO org.apache.hadoop.mapred.TaskTracker: Starting
tracker tracker_n2177:localhost/127.0.0.1:56578
2012-07-20 11:00:04,075 INFO org.apache.hadoop.mapred.TaskTracker: Starting
thread: Map-events fetcher for all reduce tasks on tracker_n2177:localhost/
127.0.0.1:56578
2012-07-20 11:00:04,087 INFO org.apache.hadoop.util.ProcessTree: setsid
exited with exit code 0
2012-07-20 11:00:04,093 INFO org.apache.hadoop.mapred.TaskTracker:  Using
ResourceCalculatorPlugin :
org.apache.hadoop.util.LinuxResourceCalculatorPlugin@1fbbdd48
*2012-07-20 11:00:04,095 WARN org.apache.hadoop.mapred.TaskTracker:
TaskTracker's totalMemoryAllottedForTasks is -1. TaskMemoryManager is
disabled.*
*2012-07-20 11:00:04,123 INFO org.apache.hadoop.mapred.IndexCache:
IndexCache created with max memory = 10485760*
2012-07-20 11:00:04,131 INFO
org.apache.hadoop.metrics2.impl.MetricsSourceAdapter: MBean for source
ShuffleServerMetrics registered.
2012-07-20 11:00:04,133 INFO org.apache.hadoop.http.HttpServer: Port
returned by webServer.getConnectors()[0].getLocalPort() before open() is
-1. Opening the listener on 50060
2012-07-20 11:00:04,133 INFO org.apache.hadoop.http.HttpServer:
listener.getLocalPort() returned 50060
webServer.getConnectors()[0].getLocalPort() returned 50060
2012-07-20 11:00:04,134 INFO org.apache.hadoop.http.HttpServer: Jetty bound
to port 50060
2012-07-20 11:00:04,134 INFO org.mortbay.log: jetty-6.1.26
2012-07-20 11:00:04,197 WARN org.mortbay.log: Can't reuse
/tmp/Jetty_0_0_0_0_50060_task____.2vcltf, using
/tmp/Jetty_0_0_0_0_50060_task____.2vcltf_554887139184681705
2012-07-20 11:00:04,396 INFO org.mortbay.log: Started
SelectChannelConnector@0.0.0.0:50060
2012-07-20 11:00:04,396 INFO org.apache.hadoop.mapred.TaskTracker:
*FILE_CACHE_SIZE
for mapOutputServlet set to : 2000*
2012-07-20 11:00:04,402 INFO org.apache.hadoop.mapred.UserLogCleaner:
Adding job_201207171708_0001 for user-log deletion with
retainTimeStamp:1342893604098
2012-07-20 11:00:04,402 INFO org.apache.hadoop.mapred.UserLogCleaner:
Adding job_201207181112_0001 for user-log deletion with
retainTimeStamp:1342893604098
2012-07-20 11:00:04,402 INFO org.apache.hadoop.mapred.UserLogCleaner:
Adding job_201207181112_0002 for user-log deletion with
retainTimeStamp:1342893604098
2012-07-20 11:00:04,402 INFO org.apache.hadoop.mapred.UserLogCleaner:
Adding job_201207181157_0001 for user-log deletion with
retainTimeStamp:1342893604098
2012-07-20 11:00:04,402 INFO org.apache.hadoop.mapred.UserLogCleaner:
Adding job_201207181220_0001 for user-log deletion with
retainTimeStamp:1342893604098
2012-07-20 11:00:04,402 INFO org.apache.hadoop.mapred.UserLogCleaner:
Adding job_201207181505_0001 for user-log deletion with
retainTimeStamp:1342893604098
2012-07-20 11:00:04,402 INFO org.apache.hadoop.mapred.UserLogCleaner:
Adding job_201207181756_0001 for user-log deletion with
retainTimeStamp:1342893604098
2012-07-20 11:00:04,402 INFO org.apache.hadoop.mapred.UserLogCleaner:
Adding job_201207181809_0001 for user-log deletion with
retainTimeStamp:1342893604098
2012-07-20 11:00:04,402 INFO org.apache.hadoop.mapred.UserLogCleaner:
Adding job_201207191117_0001 for user-log deletion with
retainTimeStamp:1342893604098
2012-07-20 11:00:04,402 INFO org.apache.hadoop.mapred.UserLogCleaner:
Adding job_201207191156_0001 for user-log deletion with
retainTimeStamp:1342893604098
:

*And now toward the end its logging this since past one hour:*

2012-07-20 12:02:32,953 INFO org.apache.hadoop.mapred.TaskTracker:
attempt_201207201100_0001_m_000002_0 0.011960195%
hdfs://n2177:9000/user/ssushmit/input/allDyads.txt:134217728+67108864
2012-07-20 12:02:32,961 INFO org.apache.hadoop.mapred.TaskTracker:
attempt_201207201100_0001_m_000005_0 0.012327315%
hdfs://n2177:9000/user/ssushmit/input/allDyads.txt:335544320+67108864
2012-07-20 12:02:32,990 INFO org.apache.hadoop.mapred.TaskTracker:
attempt_201207201100_0001_m_000003_0 0.012126452%
hdfs://n2177:9000/user/ssushmit/input/allDyads.txt:201326592+67108864
2012-07-20 12:02:33,070 INFO org.apache.hadoop.mapred.TaskTracker:
attempt_201207201100_0001_m_000004_0 0.0122014%
hdfs://n2177:9000/user/ssushmit/input/allDyads.txt:268435456+67108864
2012-07-20 12:02:35,406 INFO org.apache.hadoop.mapred.TaskTracker:
attempt_201207201100_0001_m_000006_0 0.01199097%
hdfs://n2177:9000/user/ssushmit/input/allDyads.txt:402653184+67108864
2012-07-20 12:02:35,420 INFO org.apache.hadoop.mapred.TaskTracker:
attempt_201207201100_0001_m_000001_0 0.012117211%
hdfs://n2177:9000/user/ssushmit/input/allDyads.txt:67108864+67108864
2012-07-20 12:02:35,872 INFO org.apache.hadoop.mapred.TaskTracker:
attempt_201207201100_0001_m_000007_0 0.012785331%
hdfs://n2177:9000/user/ssushmit/input/allDyads.txt:469762048+67108864
2012-07-20 12:02:35,955 INFO org.apache.hadoop.mapred.TaskTracker:
attempt_201207201100_0001_m_000000_0 0.012768%
hdfs://n2177:9000/user/ssushmit/input/allDyads.txt:0+67108864
2012-07-20 12:02:36,012 INFO org.apache.hadoop.mapred.TaskTracker:
attempt_201207201100_0001_m_000005_0 0.012339072%
hdfs://n2177:9000/user/ssushmit/input/allDyads.txt:335544320+67108864
2012-07-20 12:02:36,020 INFO org.apache.hadoop.mapred.TaskTracker:
attempt_201207201100_0001_m_000002_0 0.01197006%
hdfs://n2177:9000/user/ssushmit/input/allDyads.txt:134217728+67108864
2012-07-20 12:02:36,039 INFO org.apache.hadoop.mapred.TaskTracker:
attempt_201207201100_0001_m_000003_0 0.012135348%
hdfs://n2177:9000/user/ssushmit/input/allDyads.txt:201326592+67108864
2012-07-20 12:02:36,133 INFO org.apache.hadoop.mapred.TaskTracker:
attempt_201207201100_0001_m_000004_0 0.01221265%
hdfs://n2177:9000/user/ssushmit/input/allDyads.txt:268435456+67108864

And my job is hung at :
12/07/20 11:07:16 INFO mapred.JobClient:  map 0% reduce 0%

Why is it taking so much time here?

The max heap size looks fine (see highlighted in green text)
I am wondering about the texts highlighted in red. Do you think that could
be the problem?

Sorry for asking such basic questions. I am just clueless right now.

SS


On Fri, Jul 20, 2012 at 9:46 AM, Shanu Sushmita
<sh...@gmail.com>wrote:

> Thanks Harsh,
>
> I dont get any error message. It just halts at 0% map 0% reduce for ever.
> However it works (though still slower) if I reduce the size of the cache
> file (less than 100 KB).
> But just halts if I increase the size of the cache file. Even 120KB is not
> working :-(
>
>
> SS
>
> On 20 Jul 2012, at 09:24, Harsh J wrote:
>
>  SS,
>>
>> Is your job not progressing at all (i.e. continues to be at 0-progress
>> for hours), or does it fail after zero progress?
>>
>> I'd try adding logging to various important points of the map/reduce
>> functions and see whats taking it so long, or if its getting stuck at
>> something. Logs are observable for each task attempt via the JT Web
>> UI/etc..
>>
>> On Fri, Jul 20, 2012 at 9:43 PM, Shanu Sushmita
>> <sh...@gmail.com> wrote:
>>
>>> Hi,
>>>
>>> I am trying to solve a problem where I need to computed frequencies of
>>> words
>>> occurring in a file1 from file 2.
>>>
>>> For example:
>>> text in file1:
>>> hadoop
>>> user
>>> hello
>>> world
>>>
>>> and text in file2 is:
>>> hadoop
>>> user
>>> hello
>>> world
>>> hadoop
>>> hadoop
>>> hadoop
>>> user
>>> world
>>> world
>>> world
>>> hadoop
>>> user
>>> hello
>>>
>>> so the output should be:
>>> hadoop 5
>>> user 3
>>> hello 2
>>> world 4
>>>
>>> I read that distributed caching is a good way to do such jobs.Size of my
>>> files are:
>>> File1 = 17GB
>>> File2 = 3 MB
>>>
>>> And here is my code:
>>>
>>>
>>>
>>>
>>> import org.apache.hadoop.fs.*;
>>> import org.apache.hadoop.conf.*;
>>> import org.apache.hadoop.io.*;
>>> import org.apache.hadoop.mapred.*;
>>> import org.apache.hadoop.util.*;
>>> import java.util.*;
>>> import java.io.*;
>>> import org.apache.hadoop.filecache.*;
>>> import java.net.*;
>>>
>>> public class RepeatFreqTest
>>> {
>>>
>>>
>>> public static class Map extends MapReduceBase implements
>>> Mapper<LongWritable, Text, Text, IntWritable> {
>>>            private final static IntWritable one = new IntWritable(1);
>>>            private Text word = new Text();
>>>             private HashSet<String> dyads = new HashSet<String>();
>>>
>>>
>>>  public void configure(JobConf conf)
>>>  {
>>>
>>>
>>> Path[] cacheFiles = new Path[0];
>>>   try {
>>>
>>>
>>>  cacheFiles = DistributedCache.**getLocalCacheFiles(conf);
>>>  } // end of try
>>>
>>>  catch (IOException ioe) {
>>>            System.err.println("Caught exception while getting cached
>>> files:
>>> " + StringUtils.**stringifyException(ioe));
>>>   }// end of catch
>>>
>>>  for (Path dyadPath : cacheFiles)
>>>  {
>>>
>>>        loadDyads(dyadPath);
>>>  } // end of for cachePath
>>>
>>>
>>>  } // end of configure
>>>
>>> private void loadDyads(Path dyadPath)
>>> {
>>> try
>>> {
>>>                BufferedReader wordReader = new BufferedReader(new
>>> FileReader(dyadPath.toString()**));
>>>                String line = null;
>>>                while ((line = wordReader.readLine()) != null) {
>>>       dyads.add(line);
>>>
>>>      } // end of while
>>>
>>>      wordReader.close();
>>>
>>> }// end of try
>>> catch (IOException ioe) {
>>>      System.err.println("**IOException reading from distributed cache");
>>>      } // end of catch
>>>
>>> }// end of loadDyads()
>>>
>>>
>>>
>>>  /* actual map() method, etc go here */
>>>
>>>
>>>                        @Override
>>>             public void map(LongWritable key, Text value,
>>> OutputCollector<Text, IntWritable> output, Reporter reporter) throws
>>> IOException
>>>             {
>>>
>>>
>>>                        // dyad, ut and year from all dyads (big file)
>>> file!!!
>>>
>>>
>>>                        String line = value.toString();
>>>                        String[] tokens = line.split("\\|");
>>>                        String ut1 = tokens[0].trim();
>>>                        String dyad1 = tokens[1].trim();
>>>                        String year1 = tokens[2].trim();
>>>                        int y1 = Integer.parseInt(year1);
>>>
>>>
>>>                        // dyad, ut and year from sample dyads file
>>> (sample
>>> file stored in the memory)!!!
>>>
>>>
>>>                        Iterator it = dyads.iterator();
>>>                        while(it.hasNext())
>>>                        {
>>>                                //Text word = new Text();
>>>                                String setline = it.next().toString();
>>>
>>>                                String[] tokens2 = setline.split("\\|");
>>>                                String ut2 = tokens2[0].trim();
>>>                                String dyad2 = tokens2[1].trim();
>>>                                String year2 = tokens2[2].trim();
>>>                                int y2 = Integer.parseInt(year2);
>>>
>>>
>>>
>>>
>>>                                if(dyad1.equalsIgnoreCase(**dyad2))
>>>                                {
>>>                                        if(!(ut1.equalsIgnoreCase(ut2)**
>>> ))
>>>                                        {
>>>                                                if(y1<=y2)
>>>                                                {
>>>
>>>                                                        word.set(setline);
>>>
>>>  output.collect(word,
>>> one);
>>>                                                }
>>>
>>>                                        } // end of if ut1!=ut2
>>>
>>>                                } //
>>>
>>>
>>>                        }// end of while
>>>
>>>
>>> } // end of override map
>>> } // end of big Map class
>>>
>>>
>>> public static class Reduce extends MapReduceBase implements Reducer<Text,
>>> IntWritable, Text, IntWritable> {
>>>
>>> @Override
>>>             public void reduce(Text key, Iterator<IntWritable> values,
>>> OutputCollector<Text, IntWritable> output, Reporter reporter) throws
>>> IOException {
>>>               int sum = 0;
>>>               while (values.hasNext()) {
>>>                 sum += values.next().get();
>>>
>>>               } // end of while
>>>               output.collect(key, new IntWritable(sum));
>>>             } // end of override reduce
>>>           } // end of Big Reduce
>>>
>>>
>>> public static void main(String[] args) throws Exception {
>>>
>>> JobConf conf = new JobConf(RepeatFreqTest.class);
>>> conf.setJobName("Repeat");
>>>
>>> conf.setOutputKeyClass(Text.**class);
>>> conf.setOutputValueClass(**IntWritable.class);
>>>
>>> conf.setMapperClass(Map.class)**;
>>> conf.setCombinerClass(Reduce.**class);
>>> conf.setReducerClass(Reduce.**class);
>>>
>>> conf.setInputFormat(**TextInputFormat.class);
>>> conf.setOutputFormat(**TextOutputFormat.class);
>>>
>>> FileInputFormat.setInputPaths(**conf, new Path(args[0]));
>>> FileOutputFormat.**setOutputPath(conf, new Path(args[1]));
>>>
>>>
>>>
>>> DistributedCache.addCacheFile(**new
>>> Path("/user/ss/cacheFiles/**File1.txt").toUri(), conf);
>>>
>>> JobClient.runJob(conf);
>>>
>>> }// end of main
>>>
>>> } // end of class
>>>
>>>
>>> And I put my File1.txt and File2.txt in hdfs as follows:
>>>
>>> $HADOOP_HOME/bin/hadoop fs -mkdir input
>>> $HADOOP_HOME/bin/hadoop fs -mkdir cacheFiles
>>> $HADOOP_HOME/bin/hadoop fs -put /u/home/File2.txt input
>>> $HADOOP_HOME/bin/hadoop fs -put /u/home/File1.txt cacheFiles
>>>
>>> My problem is that my code compiles fine, but it would just not proceed
>>> from
>>> map 0% reduce 0% stage.
>>>
>>> What I am doing wrong?
>>>
>>> Any suggestion would be of great help.
>>>
>>> Best,
>>> SS
>>>
>>
>>
>>
>> --
>> Harsh J
>>
>
>

Re: Comparing input hdfs file to a distributed cache files

Posted by Shanu Sushmita <sh...@gmail.com>.
Thanks Harsh,

I dont get any error message. It just halts at 0% map 0% reduce for  
ever.
However it works (though still slower) if I reduce the size of the  
cache file (less than 100 KB).
But just halts if I increase the size of the cache file. Even 120KB is  
not working :-(


SS
On 20 Jul 2012, at 09:24, Harsh J wrote:

> SS,
>
> Is your job not progressing at all (i.e. continues to be at 0-progress
> for hours), or does it fail after zero progress?
>
> I'd try adding logging to various important points of the map/reduce
> functions and see whats taking it so long, or if its getting stuck at
> something. Logs are observable for each task attempt via the JT Web
> UI/etc..
>
> On Fri, Jul 20, 2012 at 9:43 PM, Shanu Sushmita
> <sh...@gmail.com> wrote:
>> Hi,
>>
>> I am trying to solve a problem where I need to computed frequencies  
>> of words
>> occurring in a file1 from file 2.
>>
>> For example:
>> text in file1:
>> hadoop
>> user
>> hello
>> world
>>
>> and text in file2 is:
>> hadoop
>> user
>> hello
>> world
>> hadoop
>> hadoop
>> hadoop
>> user
>> world
>> world
>> world
>> hadoop
>> user
>> hello
>>
>> so the output should be:
>> hadoop 5
>> user 3
>> hello 2
>> world 4
>>
>> I read that distributed caching is a good way to do such jobs.Size  
>> of my
>> files are:
>> File1 = 17GB
>> File2 = 3 MB
>>
>> And here is my code:
>>
>>
>>
>>
>> import org.apache.hadoop.fs.*;
>> import org.apache.hadoop.conf.*;
>> import org.apache.hadoop.io.*;
>> import org.apache.hadoop.mapred.*;
>> import org.apache.hadoop.util.*;
>> import java.util.*;
>> import java.io.*;
>> import org.apache.hadoop.filecache.*;
>> import java.net.*;
>>
>> public class RepeatFreqTest
>> {
>>
>>
>> public static class Map extends MapReduceBase implements
>> Mapper<LongWritable, Text, Text, IntWritable> {
>>            private final static IntWritable one = new IntWritable(1);
>>            private Text word = new Text();
>>             private HashSet<String> dyads = new HashSet<String>();
>>
>>
>>  public void configure(JobConf conf)
>>  {
>>
>>
>> Path[] cacheFiles = new Path[0];
>>   try {
>>
>>
>>  cacheFiles = DistributedCache.getLocalCacheFiles(conf);
>>  } // end of try
>>
>>  catch (IOException ioe) {
>>            System.err.println("Caught exception while getting  
>> cached files:
>> " + StringUtils.stringifyException(ioe));
>>   }// end of catch
>>
>>  for (Path dyadPath : cacheFiles)
>>  {
>>
>>        loadDyads(dyadPath);
>>  } // end of for cachePath
>>
>>
>>  } // end of configure
>>
>> private void loadDyads(Path dyadPath)
>> {
>> try
>> {
>>                BufferedReader wordReader = new BufferedReader(new
>> FileReader(dyadPath.toString()));
>>                String line = null;
>>                while ((line = wordReader.readLine()) != null) {
>>       dyads.add(line);
>>
>>      } // end of while
>>
>>      wordReader.close();
>>
>> }// end of try
>> catch (IOException ioe) {
>>      System.err.println("IOException reading from distributed  
>> cache");
>>      } // end of catch
>>
>> }// end of loadDyads()
>>
>>
>>
>>  /* actual map() method, etc go here */
>>
>>
>>                        @Override
>>             public void map(LongWritable key, Text value,
>> OutputCollector<Text, IntWritable> output, Reporter reporter) throws
>> IOException
>>             {
>>
>>
>>                        // dyad, ut and year from all dyads (big file)
>> file!!!
>>
>>
>>                        String line = value.toString();
>>                        String[] tokens = line.split("\\|");
>>                        String ut1 = tokens[0].trim();
>>                        String dyad1 = tokens[1].trim();
>>                        String year1 = tokens[2].trim();
>>                        int y1 = Integer.parseInt(year1);
>>
>>
>>                        // dyad, ut and year from sample dyads file  
>> (sample
>> file stored in the memory)!!!
>>
>>
>>                        Iterator it = dyads.iterator();
>>                        while(it.hasNext())
>>                        {
>>                                //Text word = new Text();
>>                                String setline = it.next().toString();
>>
>>                                String[] tokens2 = setline.split("\ 
>> \|");
>>                                String ut2 = tokens2[0].trim();
>>                                String dyad2 = tokens2[1].trim();
>>                                String year2 = tokens2[2].trim();
>>                                int y2 = Integer.parseInt(year2);
>>
>>
>>
>>
>>                                if(dyad1.equalsIgnoreCase(dyad2))
>>                                {
>>                                        if(! 
>> (ut1.equalsIgnoreCase(ut2)))
>>                                        {
>>                                                if(y1<=y2)
>>                                                {
>>
>>                                                         
>> word.set(setline);
>>                                                         
>> output.collect(word,
>> one);
>>                                                }
>>
>>                                        } // end of if ut1!=ut2
>>
>>                                } //
>>
>>
>>                        }// end of while
>>
>>
>> } // end of override map
>> } // end of big Map class
>>
>>
>> public static class Reduce extends MapReduceBase implements  
>> Reducer<Text,
>> IntWritable, Text, IntWritable> {
>>
>> @Override
>>             public void reduce(Text key, Iterator<IntWritable>  
>> values,
>> OutputCollector<Text, IntWritable> output, Reporter reporter) throws
>> IOException {
>>               int sum = 0;
>>               while (values.hasNext()) {
>>                 sum += values.next().get();
>>
>>               } // end of while
>>               output.collect(key, new IntWritable(sum));
>>             } // end of override reduce
>>           } // end of Big Reduce
>>
>>
>> public static void main(String[] args) throws Exception {
>>
>> JobConf conf = new JobConf(RepeatFreqTest.class);
>> conf.setJobName("Repeat");
>>
>> conf.setOutputKeyClass(Text.class);
>> conf.setOutputValueClass(IntWritable.class);
>>
>> conf.setMapperClass(Map.class);
>> conf.setCombinerClass(Reduce.class);
>> conf.setReducerClass(Reduce.class);
>>
>> conf.setInputFormat(TextInputFormat.class);
>> conf.setOutputFormat(TextOutputFormat.class);
>>
>> FileInputFormat.setInputPaths(conf, new Path(args[0]));
>> FileOutputFormat.setOutputPath(conf, new Path(args[1]));
>>
>>
>>
>> DistributedCache.addCacheFile(new
>> Path("/user/ss/cacheFiles/File1.txt").toUri(), conf);
>>
>> JobClient.runJob(conf);
>>
>> }// end of main
>>
>> } // end of class
>>
>>
>> And I put my File1.txt and File2.txt in hdfs as follows:
>>
>> $HADOOP_HOME/bin/hadoop fs -mkdir input
>> $HADOOP_HOME/bin/hadoop fs -mkdir cacheFiles
>> $HADOOP_HOME/bin/hadoop fs -put /u/home/File2.txt input
>> $HADOOP_HOME/bin/hadoop fs -put /u/home/File1.txt cacheFiles
>>
>> My problem is that my code compiles fine, but it would just not  
>> proceed from
>> map 0% reduce 0% stage.
>>
>> What I am doing wrong?
>>
>> Any suggestion would be of great help.
>>
>> Best,
>> SS
>
>
>
> -- 
> Harsh J


Re: Comparing input hdfs file to a distributed cache files

Posted by Harsh J <ha...@cloudera.com>.
SS,

Is your job not progressing at all (i.e. continues to be at 0-progress
for hours), or does it fail after zero progress?

I'd try adding logging to various important points of the map/reduce
functions and see whats taking it so long, or if its getting stuck at
something. Logs are observable for each task attempt via the JT Web
UI/etc..

On Fri, Jul 20, 2012 at 9:43 PM, Shanu Sushmita
<sh...@gmail.com> wrote:
> Hi,
>
> I am trying to solve a problem where I need to computed frequencies of words
> occurring in a file1 from file 2.
>
> For example:
> text in file1:
> hadoop
> user
> hello
> world
>
> and text in file2 is:
> hadoop
> user
> hello
> world
> hadoop
> hadoop
> hadoop
> user
> world
> world
> world
> hadoop
> user
> hello
>
> so the output should be:
> hadoop 5
> user 3
> hello 2
> world 4
>
> I read that distributed caching is a good way to do such jobs.Size of my
> files are:
> File1 = 17GB
> File2 = 3 MB
>
> And here is my code:
>
>
>
>
> import org.apache.hadoop.fs.*;
> import org.apache.hadoop.conf.*;
> import org.apache.hadoop.io.*;
> import org.apache.hadoop.mapred.*;
> import org.apache.hadoop.util.*;
> import java.util.*;
> import java.io.*;
> import org.apache.hadoop.filecache.*;
> import java.net.*;
>
> public class RepeatFreqTest
> {
>
>
> public static class Map extends MapReduceBase implements
> Mapper<LongWritable, Text, Text, IntWritable> {
>             private final static IntWritable one = new IntWritable(1);
>             private Text word = new Text();
>              private HashSet<String> dyads = new HashSet<String>();
>
>
>   public void configure(JobConf conf)
>   {
>
>
>  Path[] cacheFiles = new Path[0];
>    try {
>
>
>   cacheFiles = DistributedCache.getLocalCacheFiles(conf);
>   } // end of try
>
>   catch (IOException ioe) {
>             System.err.println("Caught exception while getting cached files:
> " + StringUtils.stringifyException(ioe));
>    }// end of catch
>
>   for (Path dyadPath : cacheFiles)
>   {
>
>         loadDyads(dyadPath);
>   } // end of for cachePath
>
>
>   } // end of configure
>
> private void loadDyads(Path dyadPath)
> {
> try
> {
>                 BufferedReader wordReader = new BufferedReader(new
> FileReader(dyadPath.toString()));
>                 String line = null;
>                 while ((line = wordReader.readLine()) != null) {
>        dyads.add(line);
>
>       } // end of while
>
>       wordReader.close();
>
> }// end of try
>  catch (IOException ioe) {
>       System.err.println("IOException reading from distributed cache");
>       } // end of catch
>
> }// end of loadDyads()
>
>
>
>   /* actual map() method, etc go here */
>
>
>                         @Override
>              public void map(LongWritable key, Text value,
> OutputCollector<Text, IntWritable> output, Reporter reporter) throws
> IOException
>              {
>
>
>                         // dyad, ut and year from all dyads (big file)
> file!!!
>
>
>                         String line = value.toString();
>                         String[] tokens = line.split("\\|");
>                         String ut1 = tokens[0].trim();
>                         String dyad1 = tokens[1].trim();
>                         String year1 = tokens[2].trim();
>                         int y1 = Integer.parseInt(year1);
>
>
>                         // dyad, ut and year from sample dyads file (sample
> file stored in the memory)!!!
>
>
>                         Iterator it = dyads.iterator();
>                         while(it.hasNext())
>                         {
>                                 //Text word = new Text();
>                                 String setline = it.next().toString();
>
>                                 String[] tokens2 = setline.split("\\|");
>                                 String ut2 = tokens2[0].trim();
>                                 String dyad2 = tokens2[1].trim();
>                                 String year2 = tokens2[2].trim();
>                                 int y2 = Integer.parseInt(year2);
>
>
>
>
>                                 if(dyad1.equalsIgnoreCase(dyad2))
>                                 {
>                                         if(!(ut1.equalsIgnoreCase(ut2)))
>                                         {
>                                                 if(y1<=y2)
>                                                 {
>
>                                                         word.set(setline);
>                                                         output.collect(word,
> one);
>                                                 }
>
>                                         } // end of if ut1!=ut2
>
>                                 } //
>
>
>                         }// end of while
>
>
> } // end of override map
> } // end of big Map class
>
>
> public static class Reduce extends MapReduceBase implements Reducer<Text,
> IntWritable, Text, IntWritable> {
>
>  @Override
>              public void reduce(Text key, Iterator<IntWritable> values,
> OutputCollector<Text, IntWritable> output, Reporter reporter) throws
> IOException {
>                int sum = 0;
>                while (values.hasNext()) {
>                  sum += values.next().get();
>
>                } // end of while
>                output.collect(key, new IntWritable(sum));
>              } // end of override reduce
>            } // end of Big Reduce
>
>
> public static void main(String[] args) throws Exception {
>
> JobConf conf = new JobConf(RepeatFreqTest.class);
> conf.setJobName("Repeat");
>
> conf.setOutputKeyClass(Text.class);
> conf.setOutputValueClass(IntWritable.class);
>
> conf.setMapperClass(Map.class);
> conf.setCombinerClass(Reduce.class);
> conf.setReducerClass(Reduce.class);
>
> conf.setInputFormat(TextInputFormat.class);
> conf.setOutputFormat(TextOutputFormat.class);
>
> FileInputFormat.setInputPaths(conf, new Path(args[0]));
> FileOutputFormat.setOutputPath(conf, new Path(args[1]));
>
>
>
>  DistributedCache.addCacheFile(new
> Path("/user/ss/cacheFiles/File1.txt").toUri(), conf);
>
>  JobClient.runJob(conf);
>
> }// end of main
>
> } // end of class
>
>
> And I put my File1.txt and File2.txt in hdfs as follows:
>
> $HADOOP_HOME/bin/hadoop fs -mkdir input
> $HADOOP_HOME/bin/hadoop fs -mkdir cacheFiles
> $HADOOP_HOME/bin/hadoop fs -put /u/home/File2.txt input
> $HADOOP_HOME/bin/hadoop fs -put /u/home/File1.txt cacheFiles
>
> My problem is that my code compiles fine, but it would just not proceed from
> map 0% reduce 0% stage.
>
> What I am doing wrong?
>
> Any suggestion would be of great help.
>
> Best,
> SS



-- 
Harsh J