You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@spark.apache.org by "Marco Gaido (JIRA)" <ji...@apache.org> on 2018/04/25 19:37:00 UTC
[jira] [Commented] (SPARK-24089)
DataFrame.write().mode(SaveMode.Append).insertInto(TABLE)
[ https://issues.apache.org/jira/browse/SPARK-24089?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16452937#comment-16452937 ]
Marco Gaido commented on SPARK-24089:
-------------------------------------
Blocker can be set only by commiters, I moved to Critical.
> DataFrame.write().mode(SaveMode.Append).insertInto(TABLE)
> ----------------------------------------------------------
>
> Key: SPARK-24089
> URL: https://issues.apache.org/jira/browse/SPARK-24089
> Project: Spark
> Issue Type: Bug
> Components: Java API, Spark Core, SQL
> Affects Versions: 2.3.0
> Reporter: kumar
> Priority: Critical
> Labels: bug
>
> I am completely stuck with this issue, unable to progress further. For more info pls refer this post : [https://stackoverflow.com/questions/49994085/spark-sql-2-3-dataframe-savemode-append-issue]
> I want to load multiple files one by one, don't want to load all files at a time. To achieve this i used SaveMode.Append, so that 2nd file data will be added to 1st file data in database, but it's throwing exception.
> {code:java}
> Exception in thread "main" org.apache.spark.sql.AnalysisException: unresolved operator 'InsertIntoTable LogicalRDD [a1#4, b1#5, c1#6, d1#7], false, false, false;;
> 'InsertIntoTable LogicalRDD [a1#4, b1#5, c1#6, d1#7], false, false, false
> +- LogicalRDD [a1#22, b1#23, c1#24, d1#25], false
> {code}
> Code:
> {code:java}
> package com.log;
> import com.log.common.RegexMatch;
> import com.log.spark.SparkProcessor;
> import org.apache.spark.SparkContext;
> import org.apache.spark.api.java.JavaRDD;
> import org.apache.spark.api.java.function.Function;
> import org.apache.spark.sql.*;
> import org.apache.spark.sql.types.DataTypes;
> import org.apache.spark.sql.types.StructField;
> import org.apache.spark.sql.types.StructType;
> import org.apache.spark.storage.StorageLevel;
> import java.util.ArrayList;
> import java.util.List;
> public class TestApp {
> private SparkSession spark;
> private SparkContext sparkContext;
> private SQLContext sqlContext;
> public TestApp() {
> SparkSession spark = SparkSession.builder().appName("Simple Application")
> .config("spark.master", "local").getOrCreate();
> SparkContext sc = spark.sparkContext();
> this.spark = spark;
> this.sparkContext = sc;
> }
> public static void main(String[] args) {
> TestApp app = new TestApp();
> String[] afiles = {"C:\\Users\\test\\Desktop\\logs\\log1.txt",
> "C:\\Users\\test\\Desktop\\logs\\log2.txt"};
> for (String file : afiles) {
> app.writeFileToSchema(file);
> }
> }
> public void writeFileToSchema(String filePath) {
> StructType schema = getSchema();
> JavaRDD<Row> rowRDD = getRowRDD(filePath);
> if (spark.catalog().tableExists("mylogs")) {
> logDataFrame = spark.createDataFrame(rowRDD, schema);
> logDataFrame.createOrReplaceTempView("temptable");
> logDataFrame.write().mode(SaveMode.Append).insertInto("mylogs");//exception
> } else {
> logDataFrame = spark.createDataFrame(rowRDD, schema);
> logDataFrame.createOrReplaceTempView("mylogs");
> }
> Dataset<Row> results = spark.sql("SELECT count(b1) FROM mylogs");
> List<Row> allrows = results.collectAsList();
> System.out.println("Count:"+allrows);
> sqlContext = logDataFrame.sqlContext();
> }
> Dataset<Row> logDataFrame;
> public List<Row> getTagList() {
> Dataset<Row> results = sqlContext.sql("SELECT distinct(b1) FROM mylogs");
> List<Row> allrows = results.collectAsList();
> return allrows;
> }
> public StructType getSchema() {
> String schemaString = "a1 b1 c1 d1";
> List<StructField> fields = new ArrayList<>();
> for (String fieldName : schemaString.split(" ")) {
> StructField field = DataTypes.createStructField(fieldName, DataTypes.StringType, true);
> fields.add(field);
> }
> StructType schema = DataTypes.createStructType(fields);
> return schema;
> }
> public JavaRDD<Row> getRowRDD(String filePath) {
> JavaRDD<String> logRDD = sparkContext.textFile(filePath, 1).toJavaRDD();
> RegexMatch reg = new RegexMatch();
> JavaRDD<Row> rowRDD = logRDD
> .map((Function<String, Row>) line -> {
> String[] st = line.split(" ");
> return RowFactory.create(st[0], st[1], st[2], st[3]);
> });
> rowRDD.persist(StorageLevel.MEMORY_ONLY());
> return rowRDD;
> }
> }
> {code}
--
This message was sent by Atlassian JIRA
(v7.6.3#76005)
---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@spark.apache.org
For additional commands, e-mail: issues-help@spark.apache.org