You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@spark.apache.org by "Saanvi Sharma (JIRA)" <ji...@apache.org> on 2017/11/23 06:42:00 UTC
[jira] [Updated] (SPARK-22588) SPARK: Load Data from Dataframe or
RDD to DynamoDB / dealing with null values
[ https://issues.apache.org/jira/browse/SPARK-22588?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]
Saanvi Sharma updated SPARK-22588:
----------------------------------
Description:
I am using spark 2.1 on EMR and i have a dataframe like this:
ClientNum | Value_1 | Value_2 | Value_3 | Value_4
14 | A | B | C | null
19 | X | Y | null | null
21 | R | null | null | null
I want to load data into DynamoDB table with ClientNum as key fetching:
Analyze Your Data on Amazon DynamoDB with [https://mindmajix.com/scala-training](apache) Spark11
Using Spark SQL for ETL3
here is my code that I tried to solve:
var jobConf = new JobConf(sc.hadoopConfiguration)
jobConf.set("dynamodb.servicename", "dynamodb")
jobConf.set("dynamodb.input.tableName", "table_name")
jobConf.set("dynamodb.output.tableName", "table_name")
jobConf.set("dynamodb.endpoint", "dynamodb.eu-west-1.amazonaws.com")
jobConf.set("dynamodb.regionid", "eu-west-1")
jobConf.set("dynamodb.throughput.read", "1")
jobConf.set("dynamodb.throughput.read.percent", "1")
jobConf.set("dynamodb.throughput.write", "1")
jobConf.set("dynamodb.throughput.write.percent", "1")
jobConf.set("mapred.output.format.class", "org.apache.hadoop.dynamodb.write.DynamoDBOutputFormat")
jobConf.set("mapred.input.format.class", "org.apache.hadoop.dynamodb.read.DynamoDBInputFormat")
#Import Data
val df = sqlContext.read.format("com.databricks.spark.csv").option("header", "true").option("inferSchema", "true").load(path)
I performed a transformation to have an RDD that matches the types that the DynamoDB custom output format knows how to write. The custom output format expects a tuple containing the Text and DynamoDBItemWritable types.
Create a new RDD with those types in it, in the following map call:
#Convert the dataframe to rdd
val df_rdd = df.rdd
> df_rdd: org.apache.spark.rdd.RDD[org.apache.spark.sql.Row] = MapPartitionsRDD[10] at rdd at <console>:41
#Print first rdd
df_rdd.take(1)
> res12: Array[org.apache.spark.sql.Row] = Array([14,A,B,C,null])
var ddbInsertFormattedRDD = df_rdd.map(a => {
var ddbMap = new HashMap[String, AttributeValue]()
var ClientNum = new AttributeValue()
ClientNum.setN(a.get(0).toString)
ddbMap.put("ClientNum", ClientNum)
var Value_1 = new AttributeValue()
Value_1.setS(a.get(1).toString)
ddbMap.put("Value_1", Value_1)
var Value_2 = new AttributeValue()
Value_2.setS(a.get(2).toString)
ddbMap.put("Value_2", Value_2)
var Value_3 = new AttributeValue()
Value_3.setS(a.get(3).toString)
ddbMap.put("Value_3", Value_3)
var Value_4 = new AttributeValue()
Value_4.setS(a.get(4).toString)
ddbMap.put("Value_4", Value_4)
var item = new DynamoDBItemWritable()
item.setItem(ddbMap)
(new Text(""), item)
})
This last call uses the job configuration that defines the EMR-DDB connector to write out the new RDD you created in the expected format:
ddbInsertFormattedRDD.saveAsHadoopDataset(jobConf)
fails with the follwoing error:
Caused by: java.lang.NullPointerException
null values caused the error, if I try with ClientNum and Value_1 it works data is correctly inserted on DynamoDB table.
Thanks for your help !!
was:
I am using spark 2.1 on EMR and i have a dataframe like this:
ClientNum | Value_1 | Value_2 | Value_3 | Value_4
14 | A | B | C | null
19 | X | Y | null | null
21 | R | null | null | null
I want to load data into DynamoDB table with ClientNum as key fetching:
Analyze Your Data on Amazon DynamoDB with [https://mindmajix.com/scala-training] Spark11
Using Spark SQL for ETL3
here is my code that I tried to solve:
var jobConf = new JobConf(sc.hadoopConfiguration)
jobConf.set("dynamodb.servicename", "dynamodb")
jobConf.set("dynamodb.input.tableName", "table_name")
jobConf.set("dynamodb.output.tableName", "table_name")
jobConf.set("dynamodb.endpoint", "dynamodb.eu-west-1.amazonaws.com")
jobConf.set("dynamodb.regionid", "eu-west-1")
jobConf.set("dynamodb.throughput.read", "1")
jobConf.set("dynamodb.throughput.read.percent", "1")
jobConf.set("dynamodb.throughput.write", "1")
jobConf.set("dynamodb.throughput.write.percent", "1")
jobConf.set("mapred.output.format.class", "org.apache.hadoop.dynamodb.write.DynamoDBOutputFormat")
jobConf.set("mapred.input.format.class", "org.apache.hadoop.dynamodb.read.DynamoDBInputFormat")
#Import Data
val df = sqlContext.read.format("com.databricks.spark.csv").option("header", "true").option("inferSchema", "true").load(path)
I performed a transformation to have an RDD that matches the types that the DynamoDB custom output format knows how to write. The custom output format expects a tuple containing the Text and DynamoDBItemWritable types.
Create a new RDD with those types in it, in the following map call:
#Convert the dataframe to rdd
val df_rdd = df.rdd
> df_rdd: org.apache.spark.rdd.RDD[org.apache.spark.sql.Row] = MapPartitionsRDD[10] at rdd at <console>:41
#Print first rdd
df_rdd.take(1)
> res12: Array[org.apache.spark.sql.Row] = Array([14,A,B,C,null])
var ddbInsertFormattedRDD = df_rdd.map(a => {
var ddbMap = new HashMap[String, AttributeValue]()
var ClientNum = new AttributeValue()
ClientNum.setN(a.get(0).toString)
ddbMap.put("ClientNum", ClientNum)
var Value_1 = new AttributeValue()
Value_1.setS(a.get(1).toString)
ddbMap.put("Value_1", Value_1)
var Value_2 = new AttributeValue()
Value_2.setS(a.get(2).toString)
ddbMap.put("Value_2", Value_2)
var Value_3 = new AttributeValue()
Value_3.setS(a.get(3).toString)
ddbMap.put("Value_3", Value_3)
var Value_4 = new AttributeValue()
Value_4.setS(a.get(4).toString)
ddbMap.put("Value_4", Value_4)
var item = new DynamoDBItemWritable()
item.setItem(ddbMap)
(new Text(""), item)
})
This last call uses the job configuration that defines the EMR-DDB connector to write out the new RDD you created in the expected format:
ddbInsertFormattedRDD.saveAsHadoopDataset(jobConf)
fails with the follwoing error:
Caused by: java.lang.NullPointerException
null values caused the error, if I try with ClientNum and Value_1 it works data is correctly inserted on DynamoDB table.
Thanks for your help !!
> SPARK: Load Data from Dataframe or RDD to DynamoDB / dealing with null values
> -----------------------------------------------------------------------------
>
> Key: SPARK-22588
> URL: https://issues.apache.org/jira/browse/SPARK-22588
> Project: Spark
> Issue Type: Question
> Components: Deploy
> Affects Versions: 2.1.1
> Reporter: Saanvi Sharma
> Priority: Minor
> Labels: dynamodb, spark
> Original Estimate: 24h
> Remaining Estimate: 24h
>
> I am using spark 2.1 on EMR and i have a dataframe like this:
> ClientNum | Value_1 | Value_2 | Value_3 | Value_4
> 14 | A | B | C | null
> 19 | X | Y | null | null
> 21 | R | null | null | null
> I want to load data into DynamoDB table with ClientNum as key fetching:
> Analyze Your Data on Amazon DynamoDB with [https://mindmajix.com/scala-training](apache) Spark11
> Using Spark SQL for ETL3
> here is my code that I tried to solve:
> var jobConf = new JobConf(sc.hadoopConfiguration)
> jobConf.set("dynamodb.servicename", "dynamodb")
> jobConf.set("dynamodb.input.tableName", "table_name")
> jobConf.set("dynamodb.output.tableName", "table_name")
> jobConf.set("dynamodb.endpoint", "dynamodb.eu-west-1.amazonaws.com")
> jobConf.set("dynamodb.regionid", "eu-west-1")
> jobConf.set("dynamodb.throughput.read", "1")
> jobConf.set("dynamodb.throughput.read.percent", "1")
> jobConf.set("dynamodb.throughput.write", "1")
> jobConf.set("dynamodb.throughput.write.percent", "1")
>
> jobConf.set("mapred.output.format.class", "org.apache.hadoop.dynamodb.write.DynamoDBOutputFormat")
> jobConf.set("mapred.input.format.class", "org.apache.hadoop.dynamodb.read.DynamoDBInputFormat")
> #Import Data
> val df = sqlContext.read.format("com.databricks.spark.csv").option("header", "true").option("inferSchema", "true").load(path)
> I performed a transformation to have an RDD that matches the types that the DynamoDB custom output format knows how to write. The custom output format expects a tuple containing the Text and DynamoDBItemWritable types.
> Create a new RDD with those types in it, in the following map call:
> #Convert the dataframe to rdd
> val df_rdd = df.rdd
> > df_rdd: org.apache.spark.rdd.RDD[org.apache.spark.sql.Row] = MapPartitionsRDD[10] at rdd at <console>:41
>
> #Print first rdd
> df_rdd.take(1)
> > res12: Array[org.apache.spark.sql.Row] = Array([14,A,B,C,null])
> var ddbInsertFormattedRDD = df_rdd.map(a => {
> var ddbMap = new HashMap[String, AttributeValue]()
> var ClientNum = new AttributeValue()
> ClientNum.setN(a.get(0).toString)
> ddbMap.put("ClientNum", ClientNum)
> var Value_1 = new AttributeValue()
> Value_1.setS(a.get(1).toString)
> ddbMap.put("Value_1", Value_1)
> var Value_2 = new AttributeValue()
> Value_2.setS(a.get(2).toString)
> ddbMap.put("Value_2", Value_2)
> var Value_3 = new AttributeValue()
> Value_3.setS(a.get(3).toString)
> ddbMap.put("Value_3", Value_3)
> var Value_4 = new AttributeValue()
> Value_4.setS(a.get(4).toString)
> ddbMap.put("Value_4", Value_4)
> var item = new DynamoDBItemWritable()
> item.setItem(ddbMap)
> (new Text(""), item)
> })
> This last call uses the job configuration that defines the EMR-DDB connector to write out the new RDD you created in the expected format:
> ddbInsertFormattedRDD.saveAsHadoopDataset(jobConf)
> fails with the follwoing error:
> Caused by: java.lang.NullPointerException
> null values caused the error, if I try with ClientNum and Value_1 it works data is correctly inserted on DynamoDB table.
> Thanks for your help !!
--
This message was sent by Atlassian JIRA
(v6.4.14#64029)
---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@spark.apache.org
For additional commands, e-mail: issues-help@spark.apache.org