You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@spark.apache.org by diplomatic Guru <di...@gmail.com> on 2015/07/22 20:11:29 UTC

Performance issue with Spak's foreachpartition method

Hello all,

We are having a major performance issue with the Spark, which is holding us
from going live.

We have a job that carries out computation on log files and write the
results into Oracle DB.

The reducer 'reduceByKey'  have been set to parallelize by 4 as we don't
want to establish too many DB connections.

We are then calling the foreachPartition on the RDD pairs that were reduced
by the key. Within this foreachPartition method we establish DB connection,
then iterate the results, prepare the Oracle statement for batch insertion
then we commit the batch and close the connection. All these are working
fine.

However, when we execute the job to process 12GB of data, it takes forever
to complete, especially at the foreachPartition stage.

We submitted the job with 6 executors, 2 cores, and 6GB memory of which 0.3
is assigned to spark.storage.memoryFraction.

The job is taking about 50 minutes to complete, which is not ideal. I'm not
sure how we could enhance the performance. I've provided the main body of
the codes, please take a look and advice:

>From Driver:

reduceResultsRDD.foreachPartition(new DB.InsertFunction(
dbuser,dbpass,batchsize));


DB class:

public class DB {
private static final Logger logger = LoggerFactory
.getLogger(DB.class);
public static class InsertFunction implements
VoidFunction<Iterator<Tuple2<String, String>>> {

private static final long serialVersionUID = 999955766876878L;
private String dbuser = "";
private String dbpass = "";
private int batchsize;

public InsertFunction(String dbuser, String dbpass, int batchsize) {
super();
this.dbuser = dbuser;
this.dbuser = dbuser;
this.batchsize=batchsize;
}

@Override
public void call(Iterator<Tuple2<String, String>> results) {
Connection connect = null;
PreparedStatement pstmt = null;
try {
connect = getDBConnection(dbuser,
dbpass);

int count = 0;

if (batchsize <= 0) {
batchsize = 10000;
}

pstmt1 = connect
.prepareStatement("MERGE INTO SOME TABLE IF RECORD FOUND, IF NOT INSERT");

while (results.hasNext()) {

Tuple2<String, String> kv = results.next();
 String [] data = kv._1.concat("," +kv._2).split(",");

 pstmt.setString(1, data[0].toString());
pstmt.setString(2, data[1].toString());
.....

pstmt.addBatch();

count++;

if (count == batchsize) {
logger.info("BulkCount : " + count);
pstmt.executeBatch();
connect.commit();
count = 0;
}

pstmt.executeBatch();
connect.commit();

}

pstmt.executeBatch();
connect.commit();

} catch (Exception e) {
logger.error("InsertFunction error: " + e.getMessage());
} finally {

if (pstmt != null) {
pstmt.close();
}

try {
 connect.close();
} catch (SQLException e) {
logger.error("InsertFunction Connection Close error: "
+ e.getMessage());
}
}
}

}
}

Re: Performance issue with Spak's foreachpartition method

Posted by diplomatic Guru <di...@gmail.com>.
Thanks Robin for your reply.

I'm pretty sure that writing to Oracle is taking longer as when writing to
HDFS is only taking ~5minutes.

The job is writing about ~5 Million of records. I've set the job to call
executeBatch() when the batchSize reaches 200,000 of records, so I assume
that commit will be invoked at every 200K batch. In this case, it should
only call commit 25 times, is this too much? I wouldn't want to increase
the batch size any further as it may cause Java heap issue. I do not have
much knowledge in Oracle side, so any advice with the configuration will be
grateful.

Thanks,

Raj





On 22 July 2015 at 20:20, Robin East <ro...@xense.co.uk> wrote:

> The first question I would ask is have you determined whether you have a
> performance issue writing to Oracle? In particular how many commits are you
> making? If you are issuing a lot of commits that would be a performance
> problem.
>
> Robin
>
> On 22 Jul 2015, at 19:11, diplomatic Guru <di...@gmail.com>
> wrote:
>
> Hello all,
>
> We are having a major performance issue with the Spark, which is holding
> us from going live.
>
> We have a job that carries out computation on log files and write the
> results into Oracle DB.
>
> The reducer 'reduceByKey'  have been set to parallelize by 4 as we don't
> want to establish too many DB connections.
>
> We are then calling the foreachPartition on the RDD pairs that were
> reduced by the key. Within this foreachPartition method we establish DB
> connection, then iterate the results, prepare the Oracle statement for
> batch insertion then we commit the batch and close the connection. All
> these are working fine.
>
> However, when we execute the job to process 12GB of data, it takes forever
> to complete, especially at the foreachPartition stage.
>
> We submitted the job with 6 executors, 2 cores, and 6GB memory of which
> 0.3 is assigned to spark.storage.memoryFraction.
>
> The job is taking about 50 minutes to complete, which is not ideal. I'm
> not sure how we could enhance the performance. I've provided the main body
> of the codes, please take a look and advice:
>
> From Driver:
>
> reduceResultsRDD.foreachPartition(new DB.InsertFunction(
> dbuser,dbpass,batchsize));
>
>
> DB class:
>
> public class DB {
> private static final Logger logger = LoggerFactory
> .getLogger(DB.class);
> public static class InsertFunction implements
> VoidFunction<Iterator<Tuple2<String, String>>> {
>
> private static final long serialVersionUID = 999955766876878L;
> private String dbuser = "";
> private String dbpass = "";
> private int batchsize;
>
> public InsertFunction(String dbuser, String dbpass, int batchsize) {
> super();
> this.dbuser = dbuser;
> this.dbuser = dbuser;
> this.batchsize=batchsize;
> }
>
> @Override
> public void call(Iterator<Tuple2<String, String>> results) {
> Connection connect = null;
> PreparedStatement pstmt = null;
> try {
> connect = getDBConnection(dbuser,
> dbpass);
>
> int count = 0;
>
> if (batchsize <= 0) {
> batchsize = 10000;
> }
>
> pstmt1 = connect
> .prepareStatement("MERGE INTO SOME TABLE IF RECORD FOUND, IF NOT INSERT");
>
> while (results.hasNext()) {
>
> Tuple2<String, String> kv = results.next();
>  String [] data = kv._1.concat("," +kv._2).split(",");
>
>  pstmt.setString(1, data[0].toString());
> pstmt.setString(2, data[1].toString());
> .....
>
> pstmt.addBatch();
>
> count++;
>
> if (count == batchsize) {
> logger.info("BulkCount : " + count);
> pstmt.executeBatch();
> connect.commit();
> count = 0;
> }
>
> pstmt.executeBatch();
> connect.commit();
>
> }
>
> pstmt.executeBatch();
> connect.commit();
>
> } catch (Exception e) {
> logger.error("InsertFunction error: " + e.getMessage());
> } finally {
>
> if (pstmt != null) {
> pstmt.close();
> }
>
> try {
>  connect.close();
> } catch (SQLException e) {
> logger.error("InsertFunction Connection Close error: "
> + e.getMessage());
> }
> }
> }
>
> }
> }
>
>
>

Re: Performance issue with Spak's foreachpartition method

Posted by Robin East <ro...@xense.co.uk>.
The first question I would ask is have you determined whether you have a performance issue writing to Oracle? In particular how many commits are you making? If you are issuing a lot of commits that would be a performance problem.

Robin

> On 22 Jul 2015, at 19:11, diplomatic Guru <di...@gmail.com> wrote:
> 
> Hello all,
> 
> We are having a major performance issue with the Spark, which is holding us from going live.
> 
> We have a job that carries out computation on log files and write the results into Oracle DB.
> 
> The reducer 'reduceByKey'  have been set to parallelize by 4 as we don't want to establish too many DB connections.
> 
> We are then calling the foreachPartition on the RDD pairs that were reduced by the key. Within this foreachPartition method we establish DB connection, then iterate the results, prepare the Oracle statement for batch insertion then we commit the batch and close the connection. All these are working fine.
> 
> However, when we execute the job to process 12GB of data, it takes forever to complete, especially at the foreachPartition stage.
> 
> We submitted the job with 6 executors, 2 cores, and 6GB memory of which 0.3 is assigned to spark.storage.memoryFraction.
> 
> The job is taking about 50 minutes to complete, which is not ideal. I'm not sure how we could enhance the performance. I've provided the main body of the codes, please take a look and advice:
> 
> From Driver:
> 
> reduceResultsRDD.foreachPartition(new DB.InsertFunction( dbuser,dbpass,batchsize));
> 
> 
> DB class:
> 
> public class DB {
> 	private static final Logger logger = LoggerFactory
> 			.getLogger(DB.class);
> 	
> public static class InsertFunction implements
> 			VoidFunction<Iterator<Tuple2<String, String>>> {
> 
> 		private static final long serialVersionUID = 999955766876878L;
> 		private String dbuser = "";
> 		private String dbpass = "";
> 		private int batchsize;
> 
> 		public InsertFunction(String dbuser, String dbpass, int batchsize) {
> 			super();
> 			this.dbuser = dbuser;
> 			this.dbuser = dbuser;
> 			this.batchsize=batchsize;
> 		}
> 
> @Override
> 		public void call(Iterator<Tuple2<String, String>> results) {
> 			Connection connect = null;
> 			PreparedStatement pstmt = null;
> 			try {
> 				connect = getDBConnection(dbuser,
> 						dbpass);
> 
> 				int count = 0;
> 
> 				if (batchsize <= 0) {
> 					batchsize = 10000;
> 				}
> 
> 				pstmt1 = connect
> 						.prepareStatement("MERGE INTO SOME TABLE IF RECORD FOUND, IF NOT INSERT");
> 
> 				while (results.hasNext()) {
> 
> 					Tuple2<String, String> kv = results.next();
> 	
> 						String [] data = kv._1.concat("," +kv._2).split(",");
> 
> 				
> 					pstmt.setString(1, data[0].toString());
> 					pstmt.setString(2, data[1].toString());
> 		 .....
> 
> 					pstmt.addBatch();
> 
> 					count++;
> 
> 					if (count == batchsize) {
> 						logger.info <http://logger.info/>("BulkCount : " + count);
> 						pstmt.executeBatch();
> 						connect.commit();
> 						count = 0;
> 					}
> 
> 					pstmt.executeBatch();
> 					connect.commit();
> 
> 				}
> 
> 				pstmt.executeBatch();
> 				connect.commit();
> 
> 			} catch (Exception e) {
> 				logger.error("InsertFunction error: " + e.getMessage());
> 			} finally {
> 
> 				if (pstmt != null) {
> 					pstmt.close();
> 				}
> 
> 				try {
> 	
> 					connect.close();
> 				} catch (SQLException e) {
> 					logger.error("InsertFunction Connection Close error: "
> 							+ e.getMessage());
> 				}
> 			}
> 		}
> 
> 	}
> }