You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@spark.apache.org by "kumar (JIRA)" <ji...@apache.org> on 2018/04/25 17:03:00 UTC

[jira] [Created] (SPARK-24089) DataFrame.write.mode(SaveMode.Append).insertInto(TABLE)

kumar created SPARK-24089:
-----------------------------

             Summary: DataFrame.write.mode(SaveMode.Append).insertInto(TABLE) 
                 Key: SPARK-24089
                 URL: https://issues.apache.org/jira/browse/SPARK-24089
             Project: Spark
          Issue Type: Bug
          Components: Spark Core, SQL
    Affects Versions: 2.3.0
            Reporter: kumar


I am completely stuck with this issue, unable to progress further. For more info pls refer this post : [https://stackoverflow.com/questions/49994085/spark-sql-2-3-dataframe-savemode-append-issue]

I want to load multiple files one by one, don't want to load all files at a time. To achieve this i used SaveMode.Append, so that 2nd file data will be added to 1st file data in database, but it's throwing exception.

Code:
{code:java}
package com.log;

import com.log.common.RegexMatch;
import com.log.spark.SparkProcessor;
import org.apache.spark.SparkContext;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.function.Function;
import org.apache.spark.sql.*;
import org.apache.spark.sql.types.DataTypes;
import org.apache.spark.sql.types.StructField;
import org.apache.spark.sql.types.StructType;
import org.apache.spark.storage.StorageLevel;

import java.util.ArrayList;
import java.util.List;

public class TestApp {

    private SparkSession spark;
    private SparkContext sparkContext;
    private SQLContext sqlContext;

    public TestApp() {
        SparkSession spark = SparkSession.builder().appName("Simple Application")
                .config("spark.master", "local").getOrCreate();

        SparkContext sc = spark.sparkContext();

        this.spark = spark;
        this.sparkContext = sc;
    }

    public static void main(String[] args) {
        TestApp app = new TestApp();

        String[] afiles = {"C:\\Users\\test\\Desktop\\logs\\log1.txt",
                "C:\\Users\\test\\Desktop\\logs\\log2.txt"};

        for (String file : afiles) {
            app.writeFileToSchema(file);
        }
    }

    public void writeFileToSchema(String filePath) {

        StructType schema = getSchema();
        JavaRDD<Row> rowRDD = getRowRDD(filePath);

        if (spark.catalog().tableExists("mylogs")) {

            logDataFrame = spark.createDataFrame(rowRDD, schema);
            logDataFrame.createOrReplaceTempView("temptable");
            logDataFrame.write().mode(SaveMode.Append).insertInto("mylogs");//exception
        } else {
            logDataFrame = spark.createDataFrame(rowRDD, schema);
            logDataFrame.createOrReplaceTempView("mylogs");
        }

        Dataset<Row> results = spark.sql("SELECT count(b1) FROM mylogs");

        List<Row> allrows = results.collectAsList();

        System.out.println("Count:"+allrows);

        sqlContext = logDataFrame.sqlContext();
    }

    Dataset<Row> logDataFrame;

    public List<Row> getTagList() {

        Dataset<Row> results = sqlContext.sql("SELECT distinct(b1) FROM mylogs");
        List<Row> allrows = results.collectAsList();

        return allrows;
    }

    public StructType getSchema() {
        String schemaString = "a1 b1 c1 d1";

        List<StructField> fields = new ArrayList<>();
        for (String fieldName : schemaString.split(" ")) {
            StructField field = DataTypes.createStructField(fieldName, DataTypes.StringType, true);
            fields.add(field);
        }

        StructType schema = DataTypes.createStructType(fields);

        return schema;
    }


    public JavaRDD<Row> getRowRDD(String filePath) {

        JavaRDD<String> logRDD = sparkContext.textFile(filePath, 1).toJavaRDD();

        RegexMatch reg = new RegexMatch();
        JavaRDD<Row> rowRDD = logRDD

                .map((Function<String, Row>) line -> {

                    String[] st = line.split(" ");

                    return RowFactory.create(st[0], st[1], st[2], st[3]);
                });

        rowRDD.persist(StorageLevel.MEMORY_ONLY());

        return rowRDD;
    }
}
{code}



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@spark.apache.org
For additional commands, e-mail: issues-help@spark.apache.org