You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@spark.apache.org by "dileep (JIRA)" <ji...@apache.org> on 2016/01/21 15:54:39 UTC

[jira] [Issue Comment Deleted] (SPARK-12843) Spark should avoid scanning all partitions when limit is set

     [ https://issues.apache.org/jira/browse/SPARK-12843?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]

dileep updated SPARK-12843:
---------------------------
    Comment: was deleted

(was: public class JavaSparkSQL {
	
  public static class Person implements Serializable {
    private String name;
    private int age;

    public String getName() {
      return name;
    }

    public void setName(String name) {
      this.name = name;
    }

    public int getAge() {
      return age;
    }

    public void setAge(int age) {
      this.age = age;
    }
  }

  public static void main(String[] args) throws Exception {
	long millis1 = System.currentTimeMillis() % 1000;
    SparkConf sparkConf = new SparkConf().setAppName("JavaSparkSQL").setMaster("local[4]");
    JavaSparkContext ctx = new JavaSparkContext(sparkConf);
    SQLContext sqlContext = new SQLContext(ctx);
    // Load a text file and convert each line to a Java Bean.
    JavaRDD<Person> people = ctx.textFile("/home/394036/spark-1.6.0-bin-hadoop2.3/examples/src/main/resources/people_1.txt").map(
      new Function<String, Person>() {
        @Override
        public Person call(String line) {
          String[] parts = line.split(",");

          Person person = new Person();
          person.setName(parts[0]);
          person.setAge(Integer.parseInt(parts[1].trim()));

          return person;
        }
      });
    // Apply a schema to an RDD of Java Beans and register it as a table.
    DataFrame schemaPeople = sqlContext.createDataFrame(people, Person.class);
    schemaPeople.registerTempTable("people");
    // SQL can be run over RDDs that have been registered as tables.
    //DataFrame teenagers = sqlContext.sql("SELECT age, name FROM people WHERE age >= 13 AND age <= 19");
    //DataFrame teenagers = sqlContext.sql("SELECT * FROM people");
    DataFrame teenagers = sqlContext.sql("SELECT * FROM people limit 1");
    teenagers.cache();
    // The results of SQL queries are DataFrames and support all the normal RDD operations.
    // The columns of a row in the result can be accessed by ordinal.
    List<People> teenagerNames = teenagers.toJavaRDD().map(new Function<Row, People>() {
      @Override
      public People call(Row row) {long millis2 = System.currentTimeMillis() % 1000;
    	  People people = new People();
    	  people.setAge(row.getInt(0));
    	  people.setName(row.getString(1));
    	  //System.out.println(people.toString());
        return people;
      }
    }).collect();
    
    long millis2 = System.currentTimeMillis() % 1000;
    long millis3 = millis2 - millis1;
    System.out.println("difference = "+String.valueOf(millis3));
/*    
    
    for (String name: teenagerNames) {
      System.out.println("=====================>"+name);
    }
*/
    
    
    
    /*
    System.out.println("=== Data source: Parquet File ===");
    // DataFrames can be saved as parquet files, maintaining the schema information.
    schemaPeople.write().parquet("people.parquet");

    // Read in the parquet file created above.
    // Parquet files are self-describing so the schema is preserved.
    // The result of loading a parquet file is also a DataFrame.
    DataFrame parquetFile = sqlContext.read().parquet("people.parquet");

    //Parquet files can also be registered as tables and then used in SQL statements.
    parquetFile.registerTempTable("parquetFile");
    
    DataFrame teenagers2 = sqlContext.sql("SELECT name FROM parquetFile WHERE age >= 13 AND age <= 19");
    
    teenagerNames = teenagers2.toJavaRDD().map(new Function<Row, String>() {
      @Override
      public String call(Row row) {
          return "Name: " + row.getString(0);
      }
    }).collect();
    for (String name: teenagerNames) {
      System.out.println(name);
    }

    System.out.println("=== Data source: JSON Dataset ===");
    // A JSON dataset is pointed by path.
    // The path can be either a single text file or a directory storing text files.
    String path = "/home/394036/spark-1.6.0-bin-hadoop2.3/examples/src/main/resources/people.json";
    // Create a DataFrame from the file(s) pointed by path
    DataFrame peopleFromJsonFile = sqlContext.read().json(path);

    // Because the schema of a JSON dataset is automatically inferred, to write queries,
    // it is better to take a look at what is the schema.
    peopleFromJsonFile.printSchema();
    // The schema of people is ...
    // root
    //  |-- age: IntegerType
    //  |-- name: StringType

    // Register this DataFrame as a table.
    peopleFromJsonFile.registerTempTable("people");

    // SQL statements can be run by using the sql methods provided by sqlContext.
    DataFrame teenagers3 = sqlContext.sql("SELECT name FROM people WHERE age >= 13 AND age <= 19");

    // The results of SQL queries are DataFrame and support all the normal RDD operations.
    // The columns of a row in the result can be accessed by ordinal.
    teenagerNames = teenagers3.toJavaRDD().map(new Function<Row, String>() {
      @Override
      public String call(Row row) { return "Name: " + row.getString(0); }
    }).collect();
    for (String name: teenagerNames) {
      System.out.println(name);
    }

    // Alternatively, a DataFrame can be created for a JSON dataset represented by
    // a RDD[String] storing one JSON object per string.
    List<String> jsonData = Arrays.asList(
          "{\"name\":\"Yin\",\"address\":{\"city\":\"Columbus\",\"state\":\"Ohio\"}}");
    JavaRDD<String> anotherPeopleRDD = ctx.parallelize(jsonData);
    DataFrame peopleFromJsonRDD = sqlContext.read().json(anotherPeopleRDD.rdd());

    // Take a look at the schema of this new DataFrame.
    peopleFromJsonRDD.printSchema();
    // The schema of anotherPeople is ...
    // root
    //  |-- address: StructType
    //  |    |-- city: StringType
    //  |    |-- state: StringType
    //  |-- name: StringType

    peopleFromJsonRDD.registerTempTable("people2");

    DataFrame peopleWithCity = sqlContext.sql("SELECT name, address.city FROM people2");
    List<String> nameAndCity = peopleWithCity.toJavaRDD().map(new Function<Row, String>() {
      @Override
      public String call(Row row) {
        return "Name: " + row.getString(0) + ", City: " + row.getString(1);
      }
    }).collect();
    for (String name: nameAndCity) {
      System.out.println(name);
    }
*/
    ctx.stop();
  }
})

> Spark should avoid scanning all partitions when limit is set
> ------------------------------------------------------------
>
>                 Key: SPARK-12843
>                 URL: https://issues.apache.org/jira/browse/SPARK-12843
>             Project: Spark
>          Issue Type: Bug
>          Components: SQL
>    Affects Versions: 1.6.0
>            Reporter: Maciej BryƄski
>
> SQL Query:
> {code}
> select * from table limit 100
> {code}
> force Spark to scan all partition even when data are available on the beginning of scan.
> This behaviour should be avoided and scan should stop when enough data is collected.
> Is it related to: [SPARK-9850] ?



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@spark.apache.org
For additional commands, e-mail: issues-help@spark.apache.org