You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@spark.apache.org by "Michael Armbrust (JIRA)" <ji...@apache.org> on 2015/09/03 20:43:46 UTC

[jira] [Resolved] (SPARK-10357) DataFrames unable to drop unwanted columns

     [ https://issues.apache.org/jira/browse/SPARK-10357?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]

Michael Armbrust resolved SPARK-10357.
--------------------------------------
    Resolution: Won't Fix

The exception is being thrown from within the CSV parsing library from an external datasource.  The data is never making into Spark SQL so there is nothing that we can fix on our side.

> DataFrames unable to drop unwanted columns
> ------------------------------------------
>
>                 Key: SPARK-10357
>                 URL: https://issues.apache.org/jira/browse/SPARK-10357
>             Project: Spark
>          Issue Type: Bug
>          Components: SQL
>    Affects Versions: 1.4.1
>            Reporter: Randy Gelhausen
>
> spark-csv seems to be exposing an issue with DataFrame's inability to drop unwanted columns.
> Related GitHub issue: https://github.com/databricks/spark-csv/issues/61
> My data (with header) looks like:
> MI_PRINX,offense_id,rpt_date,occur_date,occur_time,poss_date,poss_time,beat,apt_office_prefix,apt_office_num,location,MinOfucr,MinOfibr_code,dispo_code,MaxOfnum_victims,Shift,Avg Day,loc_type,UC2 Literal,neighborhood,npu,x,y,,,
> 934782,90360664,2/5/2009,2/3/2009,13:50:00,2/3/2009,15:00:00,305,NULL,NULL,55 MCDONOUGH BLVD SW,670,2308,NULL,1,Day,Tue,35,LARCENY-NON VEHICLE,South Atlanta,Y,-84.38654,33.72024,,,
> 934783,90370891,2/6/2009,2/6/2009,8:50:00,2/6/2009,10:45:00,502,NULL,NULL,464 ANSLEY WALK TER NW,640,2305,NULL,1,Day,Fri,18,LARCENY-FROM VEHICLE,Ansley Park,E,-84.37276,33.79685,,,
> Despite using sqlContext (also tried with the programmatic raw.select, same result) to remove columns from the dataframe, attempts to operate on it cause failures.
> Snippet:
>     // Read CSV file, clean field  names
>     val raw = sqlContext.read.format("com.databricks.spark.csv").option("header", "true").option("DROPMALFORMED", "true").load(input)
>     val columns = raw.columns.map(x => x.replaceAll(" ", "_"))
>     raw.toDF(columns:_*).registerTempTable(table)
>     val clean = sqlContext.sql("select " + columns.filter(x => x.length() > 0 && x != " ").mkString(", ") + " from " + table)
>     System.err.println(clean.schema)
>     System.err.println(clean.columns.mkString(","))
>     System.err.println(clean.take(1).mkString("|"))
> StackTrace:
> {code}
> 15/08/30 18:23:13 INFO scheduler.TaskSetManager: Starting task 0.0 in stage 0.0 (TID 0, docker.dev, NODE_LOCAL, 1482 bytes)
> 15/08/30 18:23:14 INFO storage.BlockManagerInfo: Added broadcast_1_piece0 in memory on docker.dev:58272 (size: 1811.0 B, free: 530.0 MB)
> 15/08/30 18:23:14 INFO storage.BlockManagerInfo: Added broadcast_0_piece0 in memory on docker.dev:58272 (size: 21.9 KB, free: 530.0 MB)
> 15/08/30 18:23:15 INFO scheduler.TaskSetManager: Finished task 0.0 in stage 0.0 (TID 0) in 1350 ms on docker.dev (1/1)
> 15/08/30 18:23:15 INFO scheduler.DAGScheduler: ResultStage 0 (take at CsvRelation.scala:174) finished in 1.354 s
> 15/08/30 18:23:15 INFO cluster.YarnScheduler: Removed TaskSet 0.0, whose tasks have all completed, from pool 
> 15/08/30 18:23:15 INFO scheduler.DAGScheduler: Job 0 finished: take at CsvRelation.scala:174, took 1.413674 s
> StructType(StructField(MI_PRINX,StringType,true), StructField(offense_id,StringType,true), StructField(rpt_date,StringType,true), StructField(occur_date,StringType,true), StructField(occur_time,StringType,true), StructField(poss_date,StringType,true), StructField(poss_time,StringType,true), StructField(beat,StringType,true), StructField(apt_office_prefix,StringType,true), StructField(apt_office_num,StringType,true), StructField(location,StringType,true), StructField(MinOfucr,StringType,true), StructField(MinOfibr_code,StringType,true), StructField(dispo_code,StringType,true), StructField(MaxOfnum_victims,StringType,true), StructField(Shift,StringType,true), StructField(Avg_Day,StringType,true), StructField(loc_type,StringType,true), StructField(UC2_Literal,StringType,true), StructField(neighborhood,StringType,true), StructField(npu,StringType,true), StructField(x,StringType,true), StructField(y,StringType,true))
> MI_PRINX,offense_id,rpt_date,occur_date,occur_time,poss_date,poss_time,beat,apt_office_prefix,apt_office_num,location,MinOfucr,MinOfibr_code,dispo_code,MaxOfnum_victims,Shift,Avg_Day,loc_type,UC2_Literal,neighborhood,npu,x,y
> 15/08/30 18:23:16 INFO storage.MemoryStore: ensureFreeSpace(232400) called with curMem=259660, maxMem=278019440
> 15/08/30 18:23:16 INFO storage.MemoryStore: Block broadcast_2 stored as values in memory (estimated size 227.0 KB, free 264.7 MB)
> 15/08/30 18:23:16 INFO storage.MemoryStore: ensureFreeSpace(22377) called with curMem=492060, maxMem=278019440
> 15/08/30 18:23:16 INFO storage.MemoryStore: Block broadcast_2_piece0 stored as bytes in memory (estimated size 21.9 KB, free 264.6 MB)
> 15/08/30 18:23:16 INFO storage.BlockManagerInfo: Added broadcast_2_piece0 in memory on 172.17.0.19:41088 (size: 21.9 KB, free: 265.1 MB)
> 15/08/30 18:23:16 INFO spark.SparkContext: Created broadcast 2 from textFile at TextFile.scala:30
> Exception in thread "main" java.lang.IllegalArgumentException: The header contains a duplicate entry: '' in [MI_PRINX, offense_id, rpt_date, occur_date, occur_time, poss_date, poss_time, beat, apt_office_prefix, apt_office_num, location, MinOfucr, MinOfibr_code, dispo_code, MaxOfnum_victims, Shift, Avg Day, loc_type, UC2 Literal, neighborhood, npu, x, y, , , ]
> 	at org.apache.commons.csv.CSVFormat.validate(CSVFormat.java:770)
> 	at org.apache.commons.csv.CSVFormat.<init>(CSVFormat.java:364)
> 	at org.apache.commons.csv.CSVFormat.withHeader(CSVFormat.java:882)
> 	at com.databricks.spark.csv.CsvRelation.tokenRdd(CsvRelation.scala:84)
> 	at com.databricks.spark.csv.CsvRelation.buildScan(CsvRelation.scala:105)
> 	at org.apache.spark.sql.sources.DataSourceStrategy$.apply(DataSourceStrategy.scala:101)
> 	at org.apache.spark.sql.catalyst.planning.QueryPlanner$$anonfun$1.apply(QueryPlanner.scala:58)
> 	at org.apache.spark.sql.catalyst.planning.QueryPlanner$$anonfun$1.apply(QueryPlanner.scala:58)
> 	at scala.collection.Iterator$$anon$13.hasNext(Iterator.scala:371)
> 	at org.apache.spark.sql.catalyst.planning.QueryPlanner.plan(QueryPlanner.scala:59)
> 	at org.apache.spark.sql.catalyst.planning.QueryPlanner.planLater(QueryPlanner.scala:54)
> 	at org.apache.spark.sql.execution.SparkStrategies$BasicOperators$.apply(SparkStrategies.scala:300)
> 	at org.apache.spark.sql.catalyst.planning.QueryPlanner$$anonfun$1.apply(QueryPlanner.scala:58)
> 	at org.apache.spark.sql.catalyst.planning.QueryPlanner$$anonfun$1.apply(QueryPlanner.scala:58)
> 	at scala.collection.Iterator$$anon$13.hasNext(Iterator.scala:371)
> 	at org.apache.spark.sql.catalyst.planning.QueryPlanner.plan(QueryPlanner.scala:59)
> 	at org.apache.spark.sql.catalyst.planning.QueryPlanner.planLater(QueryPlanner.scala:54)
> 	at org.apache.spark.sql.execution.SparkStrategies$BasicOperators$.apply(SparkStrategies.scala:314)
> 	at org.apache.spark.sql.catalyst.planning.QueryPlanner$$anonfun$1.apply(QueryPlanner.scala:58)
> 	at org.apache.spark.sql.catalyst.planning.QueryPlanner$$anonfun$1.apply(QueryPlanner.scala:58)
> 	at scala.collection.Iterator$$anon$13.hasNext(Iterator.scala:371)
> 	at org.apache.spark.sql.catalyst.planning.QueryPlanner.plan(QueryPlanner.scala:59)
> 	at org.apache.spark.sql.SQLContext$QueryExecution.sparkPlan$lzycompute(SQLContext.scala:943)
> 	at org.apache.spark.sql.SQLContext$QueryExecution.sparkPlan(SQLContext.scala:941)
> 	at org.apache.spark.sql.SQLContext$QueryExecution.executedPlan$lzycompute(SQLContext.scala:947)
> 	at org.apache.spark.sql.SQLContext$QueryExecution.executedPlan(SQLContext.scala:947)
> 	at org.apache.spark.sql.DataFrame.collect(DataFrame.scala:1269)
> 	at org.apache.spark.sql.DataFrame.head(DataFrame.scala:1203)
> 	at org.apache.spark.sql.DataFrame.take(DataFrame.scala:1262)
> 	at com.github.randerzander.CSVLoad$.main(CSVLoad.scala:29)
> 	at com.github.randerzander.CSVLoad.main(CSVLoad.scala)
> 	at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
> 	at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
> 	at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
> 	at java.lang.reflect.Method.invoke(Method.java:497)
> 	at org.apache.spark.deploy.SparkSubmit$.org$apache$spark$deploy$SparkSubmit$$runMain(SparkSubmit.scala:665)
> 	at org.apache.spark.deploy.SparkSubmit$.doRunMain$1(SparkSubmit.scala:170)
> 	at org.apache.spark.deploy.SparkSubmit$.submit(SparkSubmit.scala:193)
> 	at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:112)
> 	at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala)
> {code}



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@spark.apache.org
For additional commands, e-mail: issues-help@spark.apache.org