You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@spark.apache.org by "kumar (JIRA)" <ji...@apache.org> on 2018/04/25 17:03:00 UTC
[jira] [Created] (SPARK-24089)
DataFrame.write.mode(SaveMode.Append).insertInto(TABLE)
kumar created SPARK-24089:
-----------------------------
Summary: DataFrame.write.mode(SaveMode.Append).insertInto(TABLE)
Key: SPARK-24089
URL: https://issues.apache.org/jira/browse/SPARK-24089
Project: Spark
Issue Type: Bug
Components: Spark Core, SQL
Affects Versions: 2.3.0
Reporter: kumar
I am completely stuck with this issue, unable to progress further. For more info pls refer this post : [https://stackoverflow.com/questions/49994085/spark-sql-2-3-dataframe-savemode-append-issue]
I want to load multiple files one by one, don't want to load all files at a time. To achieve this i used SaveMode.Append, so that 2nd file data will be added to 1st file data in database, but it's throwing exception.
Code:
{code:java}
package com.log;
import com.log.common.RegexMatch;
import com.log.spark.SparkProcessor;
import org.apache.spark.SparkContext;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.function.Function;
import org.apache.spark.sql.*;
import org.apache.spark.sql.types.DataTypes;
import org.apache.spark.sql.types.StructField;
import org.apache.spark.sql.types.StructType;
import org.apache.spark.storage.StorageLevel;
import java.util.ArrayList;
import java.util.List;
public class TestApp {
private SparkSession spark;
private SparkContext sparkContext;
private SQLContext sqlContext;
public TestApp() {
SparkSession spark = SparkSession.builder().appName("Simple Application")
.config("spark.master", "local").getOrCreate();
SparkContext sc = spark.sparkContext();
this.spark = spark;
this.sparkContext = sc;
}
public static void main(String[] args) {
TestApp app = new TestApp();
String[] afiles = {"C:\\Users\\test\\Desktop\\logs\\log1.txt",
"C:\\Users\\test\\Desktop\\logs\\log2.txt"};
for (String file : afiles) {
app.writeFileToSchema(file);
}
}
public void writeFileToSchema(String filePath) {
StructType schema = getSchema();
JavaRDD<Row> rowRDD = getRowRDD(filePath);
if (spark.catalog().tableExists("mylogs")) {
logDataFrame = spark.createDataFrame(rowRDD, schema);
logDataFrame.createOrReplaceTempView("temptable");
logDataFrame.write().mode(SaveMode.Append).insertInto("mylogs");//exception
} else {
logDataFrame = spark.createDataFrame(rowRDD, schema);
logDataFrame.createOrReplaceTempView("mylogs");
}
Dataset<Row> results = spark.sql("SELECT count(b1) FROM mylogs");
List<Row> allrows = results.collectAsList();
System.out.println("Count:"+allrows);
sqlContext = logDataFrame.sqlContext();
}
Dataset<Row> logDataFrame;
public List<Row> getTagList() {
Dataset<Row> results = sqlContext.sql("SELECT distinct(b1) FROM mylogs");
List<Row> allrows = results.collectAsList();
return allrows;
}
public StructType getSchema() {
String schemaString = "a1 b1 c1 d1";
List<StructField> fields = new ArrayList<>();
for (String fieldName : schemaString.split(" ")) {
StructField field = DataTypes.createStructField(fieldName, DataTypes.StringType, true);
fields.add(field);
}
StructType schema = DataTypes.createStructType(fields);
return schema;
}
public JavaRDD<Row> getRowRDD(String filePath) {
JavaRDD<String> logRDD = sparkContext.textFile(filePath, 1).toJavaRDD();
RegexMatch reg = new RegexMatch();
JavaRDD<Row> rowRDD = logRDD
.map((Function<String, Row>) line -> {
String[] st = line.split(" ");
return RowFactory.create(st[0], st[1], st[2], st[3]);
});
rowRDD.persist(StorageLevel.MEMORY_ONLY());
return rowRDD;
}
}
{code}
--
This message was sent by Atlassian JIRA
(v7.6.3#76005)
---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@spark.apache.org
For additional commands, e-mail: issues-help@spark.apache.org