You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@hive.apache.org by Nikolay Voronchikhin <nv...@gmail.com> on 2016/05/26 08:11:27 UTC

Fwd: How to run large Hive queries in PySpark 1.2.1

Hi PySpark users,

We need to be able to run large Hive queries in PySpark 1.2.1. Users are
running PySpark on an Edge Node, and submit jobs to a Cluster that
allocates YARN resources to the clients.
We are using MapR as the Hadoop Distribution on top of Hive 0.13 and Spark
1.2.1.


Currently, our process for writing queries works only for small result
sets, for example:
*from pyspark.sql import HiveContext*
*sqlContext = HiveContext(sc)*
*results = sqlContext.sql("select column from database.table limit
10").collect()*
*results*
<outputs resultset here>


How do I save the HiveQL query to RDD first, then output the results?

This is the error I get when running a query that requires output of
400,000 rows:
*from pyspark.sql import HiveContext*
*sqlContext = HiveContext(sc)*
*results = sqlContext.sql("select column from database.table").collect()*
*results*
...

/path/to/mapr/spark/spark-1.2.1/python/pyspark/sql.py in collect(self)
  1976         """   1977         with SCCallSiteSync(self.context) as
css:-> 1978             bytesInJava =
self._jschema_rdd.baseSchemaRDD().collectToPython().iterator()   1979
       cls = _create_cls(self.schema())   1980         return map(cls,
self._collect_iterator_through_file(bytesInJava))
/path/to/mapr/spark/spark-1.2.1/python/lib/py4j-0.8.2.1-src.zip/py4j/java_gateway.py
in __call__(self, *args)    536         answer =
self.gateway_client.send_command(command)    537         return_value
= get_return_value(answer, self.gateway_client,--> 538
self.target_id, self.name)    539     540         for temp_arg in
temp_args:
/path/to/mapr/spark/spark-1.2.1/python/lib/py4j-0.8.2.1-src.zip/py4j/protocol.py
in get_return_value(answer, gateway_client, target_id, name)    298
             raise Py4JJavaError(    299                     'An error
occurred while calling {0}{1}{2}.\n'.--> 300
format(target_id, '.', name), value)    301             else:    302
              raise Py4JError(
Py4JJavaError: An error occurred while calling o76.collectToPython.
: org.apache.spark.SparkException: Job aborted due to stage failure:
Exception while getting task result: java.io.IOException: Failed to
connect to cluster_node/IP_address:port
	at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1214)
	at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1203)
	at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1202)
	at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
	at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47)
	at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1202)
	at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:696)
	at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:696)
	at scala.Option.foreach(Option.scala:236)
	at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:696)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessActor$$anonfun$receive$2.applyOrElse(DAGScheduler.scala:1420)
	at akka.actor.Actor$class.aroundReceive(Actor.scala:465)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessActor.aroundReceive(DAGScheduler.scala:1375)
	at akka.actor.ActorCell.receiveMessage(ActorCell.scala:516)
	at akka.actor.ActorCell.invoke(ActorCell.scala:487)
	at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:238)
	at akka.dispatch.Mailbox.run(Mailbox.scala:220)
	at akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:393)
	at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
	at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
	at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
	at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)




For this example, ideally, this query should output the 400,000 row
resultset.


Thanks for your help,
*Nikolay Voronchikhin*
https://www.linkedin.com/in/nvoronchikhin

*E-mail: nvoronchikhin@gmail.com <nv...@gmail.com>*

* <nv...@gmail.com>*

Re: How to run large Hive queries in PySpark 1.2.1

Posted by Nikolay Voronchikhin <nv...@gmail.com>.
Hi Jörn,

We will be upgrading to MapR 5.1, Hive 1.2, and Spark 1.6.1 at the end of
June.

In the meantime, still can this be done with these versions?
There is not a firewall issue since we have edge nodes and cluster nodes
hosted in the same location with the same NFS mount.



On Thu, May 26, 2016 at 1:34 AM, Jörn Franke <jo...@gmail.com> wrote:

> Both have outdated versions, usually one can support you better if you
> upgrade to the newest.
> Firewall could be an issue here.
>
>
> On 26 May 2016, at 10:11, Nikolay Voronchikhin <nv...@gmail.com>
> wrote:
>
> Hi PySpark users,
>
> We need to be able to run large Hive queries in PySpark 1.2.1. Users are
> running PySpark on an Edge Node, and submit jobs to a Cluster that
> allocates YARN resources to the clients.
> We are using MapR as the Hadoop Distribution on top of Hive 0.13 and Spark
> 1.2.1.
>
>
> Currently, our process for writing queries works only for small result
> sets, for example:
> *from pyspark.sql import HiveContext*
> *sqlContext = HiveContext(sc)*
> *results = sqlContext.sql("select column from database.table limit
> 10").collect()*
> *results*
> <outputs resultset here>
>
>
> How do I save the HiveQL query to RDD first, then output the results?
>
> This is the error I get when running a query that requires output of
> 400,000 rows:
> *from pyspark.sql import HiveContext*
> *sqlContext = HiveContext(sc)*
> *results = sqlContext.sql("select column from database.table").collect()*
> *results*
> ...
>
> /path/to/mapr/spark/spark-1.2.1/python/pyspark/sql.py in collect(self)   1976         """   1977         with SCCallSiteSync(self.context) as css:-> 1978             bytesInJava = self._jschema_rdd.baseSchemaRDD().collectToPython().iterator()   1979         cls = _create_cls(self.schema())   1980         return map(cls, self._collect_iterator_through_file(bytesInJava))
> /path/to/mapr/spark/spark-1.2.1/python/lib/py4j-0.8.2.1-src.zip/py4j/java_gateway.py in __call__(self, *args)    536         answer = self.gateway_client.send_command(command)    537         return_value = get_return_value(answer, self.gateway_client,--> 538                 self.target_id, self.name)    539     540         for temp_arg in temp_args:
> /path/to/mapr/spark/spark-1.2.1/python/lib/py4j-0.8.2.1-src.zip/py4j/protocol.py in get_return_value(answer, gateway_client, target_id, name)    298                 raise Py4JJavaError(    299                     'An error occurred while calling {0}{1}{2}.\n'.--> 300                     format(target_id, '.', name), value)    301             else:    302                 raise Py4JError(
> Py4JJavaError: An error occurred while calling o76.collectToPython.
> : org.apache.spark.SparkException: Job aborted due to stage failure: Exception while getting task result: java.io.IOException: Failed to connect to cluster_node/IP_address:port
> 	at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1214)
> 	at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1203)
> 	at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1202)
> 	at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
> 	at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47)
> 	at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1202)
> 	at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:696)
> 	at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:696)
> 	at scala.Option.foreach(Option.scala:236)
> 	at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:696)
> 	at org.apache.spark.scheduler.DAGSchedulerEventProcessActor$$anonfun$receive$2.applyOrElse(DAGScheduler.scala:1420)
> 	at akka.actor.Actor$class.aroundReceive(Actor.scala:465)
> 	at org.apache.spark.scheduler.DAGSchedulerEventProcessActor.aroundReceive(DAGScheduler.scala:1375)
> 	at akka.actor.ActorCell.receiveMessage(ActorCell.scala:516)
> 	at akka.actor.ActorCell.invoke(ActorCell.scala:487)
> 	at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:238)
> 	at akka.dispatch.Mailbox.run(Mailbox.scala:220)
> 	at akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:393)
> 	at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
> 	at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
> 	at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
> 	at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
>
>
>
>
> For this example, ideally, this query should output the 400,000 row
> resultset.
>
>
> Thanks for your help,
> *Nikolay Voronchikhin*
> https://www.linkedin.com/in/nvoronchikhin
>
> *E-mail: nvoronchikhin@gmail.com <nv...@gmail.com>*
>
> * <nv...@gmail.com>*
>
>

Re: How to run large Hive queries in PySpark 1.2.1

Posted by Nikolay Voronchikhin <nv...@gmail.com>.
Hi Jörn,

We will be upgrading to MapR 5.1, Hive 1.2, and Spark 1.6.1 at the end of
June.

In the meantime, still can this be done with these versions?
There is not a firewall issue since we have edge nodes and cluster nodes
hosted in the same location with the same NFS mount.



On Thu, May 26, 2016 at 1:34 AM, Jörn Franke <jo...@gmail.com> wrote:

> Both have outdated versions, usually one can support you better if you
> upgrade to the newest.
> Firewall could be an issue here.
>
>
> On 26 May 2016, at 10:11, Nikolay Voronchikhin <nv...@gmail.com>
> wrote:
>
> Hi PySpark users,
>
> We need to be able to run large Hive queries in PySpark 1.2.1. Users are
> running PySpark on an Edge Node, and submit jobs to a Cluster that
> allocates YARN resources to the clients.
> We are using MapR as the Hadoop Distribution on top of Hive 0.13 and Spark
> 1.2.1.
>
>
> Currently, our process for writing queries works only for small result
> sets, for example:
> *from pyspark.sql import HiveContext*
> *sqlContext = HiveContext(sc)*
> *results = sqlContext.sql("select column from database.table limit
> 10").collect()*
> *results*
> <outputs resultset here>
>
>
> How do I save the HiveQL query to RDD first, then output the results?
>
> This is the error I get when running a query that requires output of
> 400,000 rows:
> *from pyspark.sql import HiveContext*
> *sqlContext = HiveContext(sc)*
> *results = sqlContext.sql("select column from database.table").collect()*
> *results*
> ...
>
> /path/to/mapr/spark/spark-1.2.1/python/pyspark/sql.py in collect(self)   1976         """   1977         with SCCallSiteSync(self.context) as css:-> 1978             bytesInJava = self._jschema_rdd.baseSchemaRDD().collectToPython().iterator()   1979         cls = _create_cls(self.schema())   1980         return map(cls, self._collect_iterator_through_file(bytesInJava))
> /path/to/mapr/spark/spark-1.2.1/python/lib/py4j-0.8.2.1-src.zip/py4j/java_gateway.py in __call__(self, *args)    536         answer = self.gateway_client.send_command(command)    537         return_value = get_return_value(answer, self.gateway_client,--> 538                 self.target_id, self.name)    539     540         for temp_arg in temp_args:
> /path/to/mapr/spark/spark-1.2.1/python/lib/py4j-0.8.2.1-src.zip/py4j/protocol.py in get_return_value(answer, gateway_client, target_id, name)    298                 raise Py4JJavaError(    299                     'An error occurred while calling {0}{1}{2}.\n'.--> 300                     format(target_id, '.', name), value)    301             else:    302                 raise Py4JError(
> Py4JJavaError: An error occurred while calling o76.collectToPython.
> : org.apache.spark.SparkException: Job aborted due to stage failure: Exception while getting task result: java.io.IOException: Failed to connect to cluster_node/IP_address:port
> 	at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1214)
> 	at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1203)
> 	at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1202)
> 	at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
> 	at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47)
> 	at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1202)
> 	at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:696)
> 	at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:696)
> 	at scala.Option.foreach(Option.scala:236)
> 	at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:696)
> 	at org.apache.spark.scheduler.DAGSchedulerEventProcessActor$$anonfun$receive$2.applyOrElse(DAGScheduler.scala:1420)
> 	at akka.actor.Actor$class.aroundReceive(Actor.scala:465)
> 	at org.apache.spark.scheduler.DAGSchedulerEventProcessActor.aroundReceive(DAGScheduler.scala:1375)
> 	at akka.actor.ActorCell.receiveMessage(ActorCell.scala:516)
> 	at akka.actor.ActorCell.invoke(ActorCell.scala:487)
> 	at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:238)
> 	at akka.dispatch.Mailbox.run(Mailbox.scala:220)
> 	at akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:393)
> 	at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
> 	at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
> 	at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
> 	at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
>
>
>
>
> For this example, ideally, this query should output the 400,000 row
> resultset.
>
>
> Thanks for your help,
> *Nikolay Voronchikhin*
> https://www.linkedin.com/in/nvoronchikhin
>
> *E-mail: nvoronchikhin@gmail.com <nv...@gmail.com>*
>
> * <nv...@gmail.com>*
>
>

Re: How to run large Hive queries in PySpark 1.2.1

Posted by Jörn Franke <jo...@gmail.com>.
Both have outdated versions, usually one can support you better if you upgrade to the newest.
Firewall could be an issue here.


> On 26 May 2016, at 10:11, Nikolay Voronchikhin <nv...@gmail.com> wrote:
> 
> Hi PySpark users,
> 
> We need to be able to run large Hive queries in PySpark 1.2.1. Users are running PySpark on an Edge Node, and submit jobs to a Cluster that allocates YARN resources to the clients.
> We are using MapR as the Hadoop Distribution on top of Hive 0.13 and Spark 1.2.1.
> 
> 
> Currently, our process for writing queries works only for small result sets, for example:
> from pyspark.sql import HiveContext
> sqlContext = HiveContext(sc)
> results = sqlContext.sql("select column from database.table limit 10").collect()
> results
> <outputs resultset here>
> 
> 
> How do I save the HiveQL query to RDD first, then output the results?
> 
> This is the error I get when running a query that requires output of 400,000 rows:
> from pyspark.sql import HiveContext
> sqlContext = HiveContext(sc)
> results = sqlContext.sql("select column from database.table").collect()
> results
> ...
> /path/to/mapr/spark/spark-1.2.1/python/pyspark/sql.py in collect(self)
>    1976         """
>    1977         with SCCallSiteSync(self.context) as css:
> -> 1978             bytesInJava = self._jschema_rdd.baseSchemaRDD().collectToPython().iterator()
>    1979         cls = _create_cls(self.schema())
>    1980         return map(cls, self._collect_iterator_through_file(bytesInJava))
> 
> /path/to/mapr/spark/spark-1.2.1/python/lib/py4j-0.8.2.1-src.zip/py4j/java_gateway.py in __call__(self, *args)
>     536         answer = self.gateway_client.send_command(command)
>     537         return_value = get_return_value(answer, self.gateway_client,
> --> 538                 self.target_id, self.name)
>     539 
>     540         for temp_arg in temp_args:
> 
> /path/to/mapr/spark/spark-1.2.1/python/lib/py4j-0.8.2.1-src.zip/py4j/protocol.py in get_return_value(answer, gateway_client, target_id, name)
>     298                 raise Py4JJavaError(
>     299                     'An error occurred while calling {0}{1}{2}.\n'.
> --> 300                     format(target_id, '.', name), value)
>     301             else:
>     302                 raise Py4JError(
> 
> Py4JJavaError: An error occurred while calling o76.collectToPython.
> : org.apache.spark.SparkException: Job aborted due to stage failure: Exception while getting task result: java.io.IOException: Failed to connect to cluster_node/IP_address:port
> 	at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1214)
> 	at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1203)
> 	at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1202)
> 	at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
> 	at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47)
> 	at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1202)
> 	at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:696)
> 	at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:696)
> 	at scala.Option.foreach(Option.scala:236)
> 	at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:696)
> 	at org.apache.spark.scheduler.DAGSchedulerEventProcessActor$$anonfun$receive$2.applyOrElse(DAGScheduler.scala:1420)
> 	at akka.actor.Actor$class.aroundReceive(Actor.scala:465)
> 	at org.apache.spark.scheduler.DAGSchedulerEventProcessActor.aroundReceive(DAGScheduler.scala:1375)
> 	at akka.actor.ActorCell.receiveMessage(ActorCell.scala:516)
> 	at akka.actor.ActorCell.invoke(ActorCell.scala:487)
> 	at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:238)
> 	at akka.dispatch.Mailbox.run(Mailbox.scala:220)
> 	at akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:393)
> 	at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
> 	at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
> 	at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
> 	at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
> 
> 
> 
> For this example, ideally, this query should output the 400,000 row resultset.
> 
> 
> Thanks for your help,
> Nikolay Voronchikhin
> https://www.linkedin.com/in/nvoronchikhin
> E-mail: nvoronchikhin@gmail.com
> 
>