You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@hbase.apache.org by David Poisson <Da...@ca.fujitsu.com> on 2013/05/31 22:19:56 UTC

Best practices for loading data into hbase

Hi,
     We are still very new at all of this hbase/hadoop/mapreduce stuff. We are looking for the best practices that will fit our requirements. We are currently using the latest cloudera vmware's (single node) for our development tests.

The problem is as follows: 

We have multiple sources in different format (xml, csv, etc), which are dumps of existing systems. As one might think, there will be an initial "import" of the data into hbase 
and afterwards, the systems would most likely dump whatever data they have accumulated since the initial import into hbase or since the last data dump. Another thing, we would require to have an
intermediary step, so that we can ensure all of a source's data can be successfully processed, something which would look like:

XML data file --(MR JOB)--> Intermediate (hbase table or hfile?) --(MR JOB)--> production tables in hbase

We're guessing we can't use something like a transaction in hbase, so we thought about using a intermediate step: Is that how things are normally done?

As we import data into hbase, we will be populating several tables that links data parts together (account X in System 1 == account Y in System 2) as tuples in 3 tables. Currently, 
this is being done by a mapreduce job which reads the XML source and uses multiTableOutputFormat to "put" data into those 3 hbase tables. This method
isn't that fast using our test sample (2 minutes for 5Mb), so we are looking at optimizing the loading of data.

We have been researching bulk loading but we are unsure of a couple of things:
Once we process an xml file and we populate our 3 "production" hbase tables, could we bulk load another xml file and append this new data to our 3 tables or would it write over what was written before?
In order to bulk load, we need to output a file using HFileOutputFormat. Since MultiHFileOutputFormat doesn't seem to officially exist yet (still in the works, right?), should we process our input xml file
with 3 MapReduce jobs instead of 1 and output an hfile for each, which we could then become our intermediate step (if all 3 hfiles were created without errors, then process was successful: bulk load
in hbase)? Can you experiment with bulk loading on a vmware? We're experiencing problems with partition file not being found with the following exception:

java.lang.Exception: java.lang.IllegalArgumentException: Can't read partitions file
	at org.apache.hadoop.mapred.LocalJobRunner$Job.run(LocalJobRunner.java:404)
Caused by: java.lang.IllegalArgumentException: Can't read partitions file
	at org.apache.hadoop.mapreduce.lib.partition.TotalOrderPartitioner.setConf(TotalOrderPartitioner.java:108)
	at org.apache.hadoop.util.ReflectionUtils.setConf(ReflectionUtils.java:70)
	at org.apache.hadoop.util.ReflectionUtils.newInstance(ReflectionUtils.java:130)
	at org.apache.hadoop.mapred.MapTask$NewOutputCollector.<init>(MapTask.java:588)

We also tried another idea on how to speed things up: What if instead of doing individual puts, we passed a list of puts to put() (eg: htable.put(putList) ). Internally in hbase, would there be less overhead vs multiple
calls to put()? It seems to be faster, however since we're not using context.write, I'm guessing this will lead to problems later on, right?

Turning off WAL on puts to speed things up isn't an option, since data loss would be unacceptable, even if the chances of a failure occurring are slim.

Thanks, David

Re: Best practices for loading data into hbase

Posted by Mohammad Tariq <do...@gmail.com>.
I am sorry to barge in when heavyweights are already involved here. But,
just out of curiosity, why don't you use Sqoop <http://sqoop.apache.org/> to
import the data directly from your existing systems into HBase instead of
first taking the dump and then doing the import. Sqoop allows us to do
incremental imports as well.

Pardon me if this sounds childish.

Warm Regards,
Tariq
cloudfront.blogspot.com


On Sat, Jun 1, 2013 at 1:56 AM, Ted Yu <yu...@gmail.com> wrote:

> bq. Once we process an xml file and we populate our 3 "production" hbase
> tables, could we bulk load another xml file and append this new data to our
> 3 tables or would it write over what was written before?
>
> You can bulk load another XML file.
>
> bq. should we process our input xml file with 3 MapReduce jobs instead of 1
>
> You don't need to use 3 jobs.
>
> Looks like you were using CDH. Mind telling us the version number for HBase
> and hadoop ?
>
> Thanks
>
> On Fri, May 31, 2013 at 1:19 PM, David Poisson <
> David.Poisson@ca.fujitsu.com
> > wrote:
>
> > Hi,
> >      We are still very new at all of this hbase/hadoop/mapreduce stuff.
> We
> > are looking for the best practices that will fit our requirements. We are
> > currently using the latest cloudera vmware's (single node) for our
> > development tests.
> >
> > The problem is as follows:
> >
> > We have multiple sources in different format (xml, csv, etc), which are
> > dumps of existing systems. As one might think, there will be an initial
> > "import" of the data into hbase
> > and afterwards, the systems would most likely dump whatever data they
> have
> > accumulated since the initial import into hbase or since the last data
> > dump. Another thing, we would require to have an
> > intermediary step, so that we can ensure all of a source's data can be
> > successfully processed, something which would look like:
> >
> > XML data file --(MR JOB)--> Intermediate (hbase table or hfile?) --(MR
> > JOB)--> production tables in hbase
> >
> > We're guessing we can't use something like a transaction in hbase, so we
> > thought about using a intermediate step: Is that how things are normally
> > done?
> >
> > As we import data into hbase, we will be populating several tables that
> > links data parts together (account X in System 1 == account Y in System
> 2)
> > as tuples in 3 tables. Currently,
> > this is being done by a mapreduce job which reads the XML source and uses
> > multiTableOutputFormat to "put" data into those 3 hbase tables. This
> method
> > isn't that fast using our test sample (2 minutes for 5Mb), so we are
> > looking at optimizing the loading of data.
> >
> > We have been researching bulk loading but we are unsure of a couple of
> > things:
> > Once we process an xml file and we populate our 3 "production" hbase
> > tables, could we bulk load another xml file and append this new data to
> our
> > 3 tables or would it write over what was written before?
> > In order to bulk load, we need to output a file using HFileOutputFormat.
> > Since MultiHFileOutputFormat doesn't seem to officially exist yet (still
> in
> > the works, right?), should we process our input xml file
> > with 3 MapReduce jobs instead of 1 and output an hfile for each, which we
> > could then become our intermediate step (if all 3 hfiles were created
> > without errors, then process was successful: bulk load
> > in hbase)? Can you experiment with bulk loading on a vmware? We're
> > experiencing problems with partition file not being found with the
> > following exception:
> >
> > java.lang.Exception: java.lang.IllegalArgumentException: Can't read
> > partitions file
> >         at
> > org.apache.hadoop.mapred.LocalJobRunner$Job.run(LocalJobRunner.java:404)
> > Caused by: java.lang.IllegalArgumentException: Can't read partitions file
> >         at
> >
> org.apache.hadoop.mapreduce.lib.partition.TotalOrderPartitioner.setConf(TotalOrderPartitioner.java:108)
> >         at
> > org.apache.hadoop.util.ReflectionUtils.setConf(ReflectionUtils.java:70)
> >         at
> >
> org.apache.hadoop.util.ReflectionUtils.newInstance(ReflectionUtils.java:130)
> >         at
> >
> org.apache.hadoop.mapred.MapTask$NewOutputCollector.<init>(MapTask.java:588)
> >
> > We also tried another idea on how to speed things up: What if instead of
> > doing individual puts, we passed a list of puts to put() (eg:
> > htable.put(putList) ). Internally in hbase, would there be less overhead
> vs
> > multiple
> > calls to put()? It seems to be faster, however since we're not using
> > context.write, I'm guessing this will lead to problems later on, right?
> >
> > Turning off WAL on puts to speed things up isn't an option, since data
> > loss would be unacceptable, even if the chances of a failure occurring
> are
> > slim.
> >
> > Thanks, David
>

Re: Best practices for loading data into hbase

Posted by Ted Yu <yu...@gmail.com>.
bq. Once we process an xml file and we populate our 3 "production" hbase
tables, could we bulk load another xml file and append this new data to our
3 tables or would it write over what was written before?

You can bulk load another XML file.

bq. should we process our input xml file with 3 MapReduce jobs instead of 1

You don't need to use 3 jobs.

Looks like you were using CDH. Mind telling us the version number for HBase
and hadoop ?

Thanks

On Fri, May 31, 2013 at 1:19 PM, David Poisson <David.Poisson@ca.fujitsu.com
> wrote:

> Hi,
>      We are still very new at all of this hbase/hadoop/mapreduce stuff. We
> are looking for the best practices that will fit our requirements. We are
> currently using the latest cloudera vmware's (single node) for our
> development tests.
>
> The problem is as follows:
>
> We have multiple sources in different format (xml, csv, etc), which are
> dumps of existing systems. As one might think, there will be an initial
> "import" of the data into hbase
> and afterwards, the systems would most likely dump whatever data they have
> accumulated since the initial import into hbase or since the last data
> dump. Another thing, we would require to have an
> intermediary step, so that we can ensure all of a source's data can be
> successfully processed, something which would look like:
>
> XML data file --(MR JOB)--> Intermediate (hbase table or hfile?) --(MR
> JOB)--> production tables in hbase
>
> We're guessing we can't use something like a transaction in hbase, so we
> thought about using a intermediate step: Is that how things are normally
> done?
>
> As we import data into hbase, we will be populating several tables that
> links data parts together (account X in System 1 == account Y in System 2)
> as tuples in 3 tables. Currently,
> this is being done by a mapreduce job which reads the XML source and uses
> multiTableOutputFormat to "put" data into those 3 hbase tables. This method
> isn't that fast using our test sample (2 minutes for 5Mb), so we are
> looking at optimizing the loading of data.
>
> We have been researching bulk loading but we are unsure of a couple of
> things:
> Once we process an xml file and we populate our 3 "production" hbase
> tables, could we bulk load another xml file and append this new data to our
> 3 tables or would it write over what was written before?
> In order to bulk load, we need to output a file using HFileOutputFormat.
> Since MultiHFileOutputFormat doesn't seem to officially exist yet (still in
> the works, right?), should we process our input xml file
> with 3 MapReduce jobs instead of 1 and output an hfile for each, which we
> could then become our intermediate step (if all 3 hfiles were created
> without errors, then process was successful: bulk load
> in hbase)? Can you experiment with bulk loading on a vmware? We're
> experiencing problems with partition file not being found with the
> following exception:
>
> java.lang.Exception: java.lang.IllegalArgumentException: Can't read
> partitions file
>         at
> org.apache.hadoop.mapred.LocalJobRunner$Job.run(LocalJobRunner.java:404)
> Caused by: java.lang.IllegalArgumentException: Can't read partitions file
>         at
> org.apache.hadoop.mapreduce.lib.partition.TotalOrderPartitioner.setConf(TotalOrderPartitioner.java:108)
>         at
> org.apache.hadoop.util.ReflectionUtils.setConf(ReflectionUtils.java:70)
>         at
> org.apache.hadoop.util.ReflectionUtils.newInstance(ReflectionUtils.java:130)
>         at
> org.apache.hadoop.mapred.MapTask$NewOutputCollector.<init>(MapTask.java:588)
>
> We also tried another idea on how to speed things up: What if instead of
> doing individual puts, we passed a list of puts to put() (eg:
> htable.put(putList) ). Internally in hbase, would there be less overhead vs
> multiple
> calls to put()? It seems to be faster, however since we're not using
> context.write, I'm guessing this will lead to problems later on, right?
>
> Turning off WAL on puts to speed things up isn't an option, since data
> loss would be unacceptable, even if the chances of a failure occurring are
> slim.
>
> Thanks, David

RE: Best practices for loading data into hbase

Posted by David Poisson <Da...@ca.fujitsu.com>.
Hi,
     Thanks to everyone that replied!

@Tariq: Not childish at all. We don't have direct access to the database, but rather we will be going through a web service to obtain dumps of the data. This is why we are not using sqoop (unless sqoop would support such an operation, but not that I'm aware of).

@Ted: We're using CDH4:
rpm -q hbase
hbase-0.94.2+202-1.cdh4.2.0.p0.11.el6.x86_64
rpm -q hadoop
hadoop-2.0.0+922-1.cdh4.2.0.p0.12.el6.x86_64

@J-D: I've started researching how to set up pseudo-distributed mode on my vmware as per your suggestion, however I'm having a hard time connecting to my jobTracker. 
There's nothing in iptables, nothing in hosts.deny.
I can see I'm listening on that port, but I have a feeling I'm only listening on 127.0.0.1:8020 and not 0.0.0.0:8020:

[cloudera@locahost ~]$ netstat -lnt  | grep 8020
tcp        0      0 127.0.0.1:8020              0.0.0.0:*                   LISTEN

I can do the following: telnet 127.0.0.1 8020
However the following fails: telnet 10.9.2.194 8020

When I run my mapReduce job, here's the last few lines (for clarity):
13/06/05 10:33:50 INFO compress.CodecPool: Got brand-new compressor [.deflate]
13/06/05 10:33:50 INFO mapreduce.HFileOutputFormat: Incremental table output configured.
13/06/05 10:36:58 WARN conf.Configuration: session.id is deprecated. Instead, use dfs.metrics.session-id
13/06/05 10:36:58 INFO jvm.JvmMetrics: Initializing JVM Metrics with processName=JobTracker, sessionId=
13/06/05 10:37:16 WARN mapred.JobClient: Use GenericOptionsParser for parsing the arguments. Applications should implement Tool for the same.
13/06/05 10:37:16 WARN mapred.JobClient: No job jar file set.  User classes may not be found. See JobConf(Class) or JobConf#setJar(String).
13/06/05 10:37:30 INFO mapred.JobClient: Cleaning up the staging area file:/tmp/hadoop-cloudera/mapred/staging/cloudera283903860/.staging/job_local283903860_0001
13/06/05 10:37:30 ERROR security.UserGroupInformation: PriviledgedActionException as:cloudera (auth:SIMPLE) cause:java.net.ConnectException: Call From localhost.localdomain/127.0.0.1 to 10.9.2.194:8020 failed on connection exception: java.net.ConnectException: Connection refused; For more details see:  http://wiki.apache.org/hadoop/ConnectionRefused
13/06/05 10:37:30 ERROR sourcestaging.ReducerXML: java.net.ConnectException: Call From localhost.localdomain/127.0.0.1 to 10.9.2.194:8020 failed on connection exception: java.net.ConnectException: Connection refused; For more details see: http://wiki.apache.org/hadoop/ConnectionRefused

ssh keys have been generated so I can ssh from my vmware into my vmware with user cloudera without passwords.

The following are set in the configs (as per http://hadoop.apache.org/docs/r1.1.1/single_node_setup.html#PseudoDistributed):
conf/core-site.xml:

<configuration>
     <property>
         <name>fs.defaultFS</name> ** I changed fs.default.name to fs.defaultFS, I tried them both and neither allowed me to connect
         <value>hdfs://localhost:9000</value>
     </property>
</configuration>


conf/hdfs-site.xml:

<configuration>
     <property>
         <name>dfs.replication</name>
         <value>1</value>
     </property>
</configuration>


conf/mapred-site.xml:

<configuration>
     <property>
         <name>mapred.job.tracker</name>
         <value>localhost:9001</value>
     </property>
</configuration>


Here's my /etc/hosts file:
[cloudera@localhost /etc/alternatives/hadoop-conf]$ cat /etc/hosts
127.0.0.1		localhost.localdomain localhost
::1		localhost6.localdomain6 localhost6

We have a pretty strict proxy here, could it be interfering? Other than that, my VM's networking is set to bridged, if that makes any difference. Mind you, I'm trying to connect from my vm to my vm.

I'm at a lost here. Could really use some guidance. Thanks!

David

________________________________________
From: David Poisson [David.Poisson@ca.fujitsu.com]
Sent: Friday, May 31, 2013 4:19 PM
To: user@hbase.apache.org
Subject: Best practices for loading data into hbase

Hi,
     We are still very new at all of this hbase/hadoop/mapreduce stuff. We are looking for the best practices that will fit our requirements. We are currently using the latest cloudera vmware's (single node) for our development tests.

The problem is as follows:

We have multiple sources in different format (xml, csv, etc), which are dumps of existing systems. As one might think, there will be an initial "import" of the data into hbase
and afterwards, the systems would most likely dump whatever data they have accumulated since the initial import into hbase or since the last data dump. Another thing, we would require to have an
intermediary step, so that we can ensure all of a source's data can be successfully processed, something which would look like:

XML data file --(MR JOB)--> Intermediate (hbase table or hfile?) --(MR JOB)--> production tables in hbase

We're guessing we can't use something like a transaction in hbase, so we thought about using a intermediate step: Is that how things are normally done?

As we import data into hbase, we will be populating several tables that links data parts together (account X in System 1 == account Y in System 2) as tuples in 3 tables. Currently,
this is being done by a mapreduce job which reads the XML source and uses multiTableOutputFormat to "put" data into those 3 hbase tables. This method
isn't that fast using our test sample (2 minutes for 5Mb), so we are looking at optimizing the loading of data.

We have been researching bulk loading but we are unsure of a couple of things:
Once we process an xml file and we populate our 3 "production" hbase tables, could we bulk load another xml file and append this new data to our 3 tables or would it write over what was written before?
In order to bulk load, we need to output a file using HFileOutputFormat. Since MultiHFileOutputFormat doesn't seem to officially exist yet (still in the works, right?), should we process our input xml file
with 3 MapReduce jobs instead of 1 and output an hfile for each, which we could then become our intermediate step (if all 3 hfiles were created without errors, then process was successful: bulk load
in hbase)? Can you experiment with bulk loading on a vmware? We're experiencing problems with partition file not being found with the following exception:

java.lang.Exception: java.lang.IllegalArgumentException: Can't read partitions file
        at org.apache.hadoop.mapred.LocalJobRunner$Job.run(LocalJobRunner.java:404)
Caused by: java.lang.IllegalArgumentException: Can't read partitions file
        at org.apache.hadoop.mapreduce.lib.partition.TotalOrderPartitioner.setConf(TotalOrderPartitioner.java:108)
        at org.apache.hadoop.util.ReflectionUtils.setConf(ReflectionUtils.java:70)
        at org.apache.hadoop.util.ReflectionUtils.newInstance(ReflectionUtils.java:130)
        at org.apache.hadoop.mapred.MapTask$NewOutputCollector.<init>(MapTask.java:588)

We also tried another idea on how to speed things up: What if instead of doing individual puts, we passed a list of puts to put() (eg: htable.put(putList) ). Internally in hbase, would there be less overhead vs multiple
calls to put()? It seems to be faster, however since we're not using context.write, I'm guessing this will lead to problems later on, right?

Turning off WAL on puts to speed things up isn't an option, since data loss would be unacceptable, even if the chances of a failure occurring are slim.

Thanks, David

Re: Best practices for loading data into hbase

Posted by Jean-Daniel Cryans <jd...@apache.org>.
You cannot use the local job tracker (that is, the one that gets
started if you don't have one running) with the TotalOrderPartitioner.

You'll need to fully install hadoop on that vmware node.

Google that error to find other relevant comments.

J-D

On Fri, May 31, 2013 at 1:19 PM, David Poisson
<Da...@ca.fujitsu.com> wrote:
> Hi,
>      We are still very new at all of this hbase/hadoop/mapreduce stuff. We are looking for the best practices that will fit our requirements. We are currently using the latest cloudera vmware's (single node) for our development tests.
>
> The problem is as follows:
>
> We have multiple sources in different format (xml, csv, etc), which are dumps of existing systems. As one might think, there will be an initial "import" of the data into hbase
> and afterwards, the systems would most likely dump whatever data they have accumulated since the initial import into hbase or since the last data dump. Another thing, we would require to have an
> intermediary step, so that we can ensure all of a source's data can be successfully processed, something which would look like:
>
> XML data file --(MR JOB)--> Intermediate (hbase table or hfile?) --(MR JOB)--> production tables in hbase
>
> We're guessing we can't use something like a transaction in hbase, so we thought about using a intermediate step: Is that how things are normally done?
>
> As we import data into hbase, we will be populating several tables that links data parts together (account X in System 1 == account Y in System 2) as tuples in 3 tables. Currently,
> this is being done by a mapreduce job which reads the XML source and uses multiTableOutputFormat to "put" data into those 3 hbase tables. This method
> isn't that fast using our test sample (2 minutes for 5Mb), so we are looking at optimizing the loading of data.
>
> We have been researching bulk loading but we are unsure of a couple of things:
> Once we process an xml file and we populate our 3 "production" hbase tables, could we bulk load another xml file and append this new data to our 3 tables or would it write over what was written before?
> In order to bulk load, we need to output a file using HFileOutputFormat. Since MultiHFileOutputFormat doesn't seem to officially exist yet (still in the works, right?), should we process our input xml file
> with 3 MapReduce jobs instead of 1 and output an hfile for each, which we could then become our intermediate step (if all 3 hfiles were created without errors, then process was successful: bulk load
> in hbase)? Can you experiment with bulk loading on a vmware? We're experiencing problems with partition file not being found with the following exception:
>
> java.lang.Exception: java.lang.IllegalArgumentException: Can't read partitions file
>         at org.apache.hadoop.mapred.LocalJobRunner$Job.run(LocalJobRunner.java:404)
> Caused by: java.lang.IllegalArgumentException: Can't read partitions file
>         at org.apache.hadoop.mapreduce.lib.partition.TotalOrderPartitioner.setConf(TotalOrderPartitioner.java:108)
>         at org.apache.hadoop.util.ReflectionUtils.setConf(ReflectionUtils.java:70)
>         at org.apache.hadoop.util.ReflectionUtils.newInstance(ReflectionUtils.java:130)
>         at org.apache.hadoop.mapred.MapTask$NewOutputCollector.<init>(MapTask.java:588)
>
> We also tried another idea on how to speed things up: What if instead of doing individual puts, we passed a list of puts to put() (eg: htable.put(putList) ). Internally in hbase, would there be less overhead vs multiple
> calls to put()? It seems to be faster, however since we're not using context.write, I'm guessing this will lead to problems later on, right?
>
> Turning off WAL on puts to speed things up isn't an option, since data loss would be unacceptable, even if the chances of a failure occurring are slim.
>
> Thanks, David