You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@spark.apache.org by "dileep (JIRA)" <ji...@apache.org> on 2016/01/21 15:54:39 UTC
[jira] [Issue Comment Deleted] (SPARK-12843) Spark should avoid
scanning all partitions when limit is set
[ https://issues.apache.org/jira/browse/SPARK-12843?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]
dileep updated SPARK-12843:
---------------------------
Comment: was deleted
(was: public class JavaSparkSQL {
public static class Person implements Serializable {
private String name;
private int age;
public String getName() {
return name;
}
public void setName(String name) {
this.name = name;
}
public int getAge() {
return age;
}
public void setAge(int age) {
this.age = age;
}
}
public static void main(String[] args) throws Exception {
long millis1 = System.currentTimeMillis() % 1000;
SparkConf sparkConf = new SparkConf().setAppName("JavaSparkSQL").setMaster("local[4]");
JavaSparkContext ctx = new JavaSparkContext(sparkConf);
SQLContext sqlContext = new SQLContext(ctx);
// Load a text file and convert each line to a Java Bean.
JavaRDD<Person> people = ctx.textFile("/home/394036/spark-1.6.0-bin-hadoop2.3/examples/src/main/resources/people_1.txt").map(
new Function<String, Person>() {
@Override
public Person call(String line) {
String[] parts = line.split(",");
Person person = new Person();
person.setName(parts[0]);
person.setAge(Integer.parseInt(parts[1].trim()));
return person;
}
});
// Apply a schema to an RDD of Java Beans and register it as a table.
DataFrame schemaPeople = sqlContext.createDataFrame(people, Person.class);
schemaPeople.registerTempTable("people");
// SQL can be run over RDDs that have been registered as tables.
//DataFrame teenagers = sqlContext.sql("SELECT age, name FROM people WHERE age >= 13 AND age <= 19");
//DataFrame teenagers = sqlContext.sql("SELECT * FROM people");
DataFrame teenagers = sqlContext.sql("SELECT * FROM people limit 1");
teenagers.cache();
// The results of SQL queries are DataFrames and support all the normal RDD operations.
// The columns of a row in the result can be accessed by ordinal.
List<People> teenagerNames = teenagers.toJavaRDD().map(new Function<Row, People>() {
@Override
public People call(Row row) {long millis2 = System.currentTimeMillis() % 1000;
People people = new People();
people.setAge(row.getInt(0));
people.setName(row.getString(1));
//System.out.println(people.toString());
return people;
}
}).collect();
long millis2 = System.currentTimeMillis() % 1000;
long millis3 = millis2 - millis1;
System.out.println("difference = "+String.valueOf(millis3));
/*
for (String name: teenagerNames) {
System.out.println("=====================>"+name);
}
*/
/*
System.out.println("=== Data source: Parquet File ===");
// DataFrames can be saved as parquet files, maintaining the schema information.
schemaPeople.write().parquet("people.parquet");
// Read in the parquet file created above.
// Parquet files are self-describing so the schema is preserved.
// The result of loading a parquet file is also a DataFrame.
DataFrame parquetFile = sqlContext.read().parquet("people.parquet");
//Parquet files can also be registered as tables and then used in SQL statements.
parquetFile.registerTempTable("parquetFile");
DataFrame teenagers2 = sqlContext.sql("SELECT name FROM parquetFile WHERE age >= 13 AND age <= 19");
teenagerNames = teenagers2.toJavaRDD().map(new Function<Row, String>() {
@Override
public String call(Row row) {
return "Name: " + row.getString(0);
}
}).collect();
for (String name: teenagerNames) {
System.out.println(name);
}
System.out.println("=== Data source: JSON Dataset ===");
// A JSON dataset is pointed by path.
// The path can be either a single text file or a directory storing text files.
String path = "/home/394036/spark-1.6.0-bin-hadoop2.3/examples/src/main/resources/people.json";
// Create a DataFrame from the file(s) pointed by path
DataFrame peopleFromJsonFile = sqlContext.read().json(path);
// Because the schema of a JSON dataset is automatically inferred, to write queries,
// it is better to take a look at what is the schema.
peopleFromJsonFile.printSchema();
// The schema of people is ...
// root
// |-- age: IntegerType
// |-- name: StringType
// Register this DataFrame as a table.
peopleFromJsonFile.registerTempTable("people");
// SQL statements can be run by using the sql methods provided by sqlContext.
DataFrame teenagers3 = sqlContext.sql("SELECT name FROM people WHERE age >= 13 AND age <= 19");
// The results of SQL queries are DataFrame and support all the normal RDD operations.
// The columns of a row in the result can be accessed by ordinal.
teenagerNames = teenagers3.toJavaRDD().map(new Function<Row, String>() {
@Override
public String call(Row row) { return "Name: " + row.getString(0); }
}).collect();
for (String name: teenagerNames) {
System.out.println(name);
}
// Alternatively, a DataFrame can be created for a JSON dataset represented by
// a RDD[String] storing one JSON object per string.
List<String> jsonData = Arrays.asList(
"{\"name\":\"Yin\",\"address\":{\"city\":\"Columbus\",\"state\":\"Ohio\"}}");
JavaRDD<String> anotherPeopleRDD = ctx.parallelize(jsonData);
DataFrame peopleFromJsonRDD = sqlContext.read().json(anotherPeopleRDD.rdd());
// Take a look at the schema of this new DataFrame.
peopleFromJsonRDD.printSchema();
// The schema of anotherPeople is ...
// root
// |-- address: StructType
// | |-- city: StringType
// | |-- state: StringType
// |-- name: StringType
peopleFromJsonRDD.registerTempTable("people2");
DataFrame peopleWithCity = sqlContext.sql("SELECT name, address.city FROM people2");
List<String> nameAndCity = peopleWithCity.toJavaRDD().map(new Function<Row, String>() {
@Override
public String call(Row row) {
return "Name: " + row.getString(0) + ", City: " + row.getString(1);
}
}).collect();
for (String name: nameAndCity) {
System.out.println(name);
}
*/
ctx.stop();
}
})
> Spark should avoid scanning all partitions when limit is set
> ------------------------------------------------------------
>
> Key: SPARK-12843
> URL: https://issues.apache.org/jira/browse/SPARK-12843
> Project: Spark
> Issue Type: Bug
> Components: SQL
> Affects Versions: 1.6.0
> Reporter: Maciej BryĆski
>
> SQL Query:
> {code}
> select * from table limit 100
> {code}
> force Spark to scan all partition even when data are available on the beginning of scan.
> This behaviour should be avoided and scan should stop when enough data is collected.
> Is it related to: [SPARK-9850] ?
--
This message was sent by Atlassian JIRA
(v6.3.4#6332)
---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@spark.apache.org
For additional commands, e-mail: issues-help@spark.apache.org