You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@spark.apache.org by "kumar (JIRA)" <ji...@apache.org> on 2018/04/26 01:51:00 UTC

[jira] [Comment Edited] (SPARK-24089) DataFrame.write().mode(SaveMode.Append).insertInto(TABLE)

    [ https://issues.apache.org/jira/browse/SPARK-24089?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16453345#comment-16453345 ] 

kumar edited comment on SPARK-24089 at 4/26/18 1:50 AM:
--------------------------------------------------------

UNION just joins the DataFrames, i am not looking for that. I want to insert(append) data to already existing table. 
{code:java}
public enum SaveMode {
  /**
   * Append mode means that when saving a DataFrame to a data source, if data/table already exists, contents of the DataFrame are expected to be appended to existing data.
   *
   * @since 1.3.0
   */
  Append
{code}
_These comments taken from SaveMode enum for Append key_, this is what exactly i am looking for. I believe that's what i did in my code. It's not working, data is not appending, throwing exception, that's the issue i am facing. Please check it.


was (Author: rkrgarlapati):
UNION just joins the DataFrames, i am not looking for that. I want insert(append) data to already existing table. 
{code:java}
public enum SaveMode {
  /**
   * Append mode means that when saving a DataFrame to a data source, if data/table already exists, contents of the DataFrame are expected to be appended to existing data.
   *
   * @since 1.3.0
   */
  Append
{code}
_These comments taken from SaveMode enum for Append key_, this is what exactly i am looking for. I believe that's what i did in my code. It's not working, data is not appending, throwing exception, that's the issue i am facing. Please check it.

> DataFrame.write().mode(SaveMode.Append).insertInto(TABLE) 
> ----------------------------------------------------------
>
>                 Key: SPARK-24089
>                 URL: https://issues.apache.org/jira/browse/SPARK-24089
>             Project: Spark
>          Issue Type: Bug
>          Components: Java API, Spark Core, SQL
>    Affects Versions: 2.3.0
>            Reporter: kumar
>            Priority: Major
>              Labels: bug
>
> I am completely stuck with this issue, unable to progress further. For more info pls refer this post : [https://stackoverflow.com/questions/49994085/spark-sql-2-3-dataframe-savemode-append-issue]
> I want to load multiple files one by one, don't want to load all files at a time. To achieve this i used SaveMode.Append, so that 2nd file data will be added to 1st file data in database, but it's throwing exception.
> {code:java}
> Exception in thread "main" org.apache.spark.sql.AnalysisException: unresolved operator 'InsertIntoTable LogicalRDD [a1#4, b1#5, c1#6, d1#7], false, false, false;;
> 'InsertIntoTable LogicalRDD [a1#4, b1#5, c1#6, d1#7], false, false, false
> +- LogicalRDD [a1#22, b1#23, c1#24, d1#25], false
> {code}
> Code:
> {code:java}
> package com.log;
> import com.log.common.RegexMatch;
> import com.log.spark.SparkProcessor;
> import org.apache.spark.SparkContext;
> import org.apache.spark.api.java.JavaRDD;
> import org.apache.spark.api.java.function.Function;
> import org.apache.spark.sql.*;
> import org.apache.spark.sql.types.DataTypes;
> import org.apache.spark.sql.types.StructField;
> import org.apache.spark.sql.types.StructType;
> import org.apache.spark.storage.StorageLevel;
> import java.util.ArrayList;
> import java.util.List;
> public class TestApp {
>     private SparkSession spark;
>     private SparkContext sparkContext;
>     private SQLContext sqlContext;
>     public TestApp() {
>         SparkSession spark = SparkSession.builder().appName("Simple Application")
>                 .config("spark.master", "local").getOrCreate();
>         SparkContext sc = spark.sparkContext();
>         this.spark = spark;
>         this.sparkContext = sc;
>     }
>     public static void main(String[] args) {
>         TestApp app = new TestApp();
>         String[] afiles = {"C:\\Users\\test\\Desktop\\logs\\log1.txt",
>                 "C:\\Users\\test\\Desktop\\logs\\log2.txt"};
>         for (String file : afiles) {
>             app.writeFileToSchema(file);
>         }
>     }
>     public void writeFileToSchema(String filePath) {
>         StructType schema = getSchema();
>         JavaRDD<Row> rowRDD = getRowRDD(filePath);
>         if (spark.catalog().tableExists("mylogs")) {
>             logDataFrame = spark.createDataFrame(rowRDD, schema);
>             logDataFrame.createOrReplaceTempView("temptable");
>             logDataFrame.write().mode(SaveMode.Append).insertInto("mylogs");//exception
>         } else {
>             logDataFrame = spark.createDataFrame(rowRDD, schema);
>             logDataFrame.createOrReplaceTempView("mylogs");
>         }
>         Dataset<Row> results = spark.sql("SELECT count(b1) FROM mylogs");
>         List<Row> allrows = results.collectAsList();
>         System.out.println("Count:"+allrows);
>         sqlContext = logDataFrame.sqlContext();
>     }
>     Dataset<Row> logDataFrame;
>     public List<Row> getTagList() {
>         Dataset<Row> results = sqlContext.sql("SELECT distinct(b1) FROM mylogs");
>         List<Row> allrows = results.collectAsList();
>         return allrows;
>     }
>     public StructType getSchema() {
>         String schemaString = "a1 b1 c1 d1";
>         List<StructField> fields = new ArrayList<>();
>         for (String fieldName : schemaString.split(" ")) {
>             StructField field = DataTypes.createStructField(fieldName, DataTypes.StringType, true);
>             fields.add(field);
>         }
>         StructType schema = DataTypes.createStructType(fields);
>         return schema;
>     }
>     public JavaRDD<Row> getRowRDD(String filePath) {
>         JavaRDD<String> logRDD = sparkContext.textFile(filePath, 1).toJavaRDD();
>         RegexMatch reg = new RegexMatch();
>         JavaRDD<Row> rowRDD = logRDD
>                 .map((Function<String, Row>) line -> {
>                     String[] st = line.split(" ");
>                     return RowFactory.create(st[0], st[1], st[2], st[3]);
>                 });
>         rowRDD.persist(StorageLevel.MEMORY_ONLY());
>         return rowRDD;
>     }
> }
> {code}



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@spark.apache.org
For additional commands, e-mail: issues-help@spark.apache.org