You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@spark.apache.org by "Hristo Angelov (JIRA)" <ji...@apache.org> on 2018/01/26 13:48:00 UTC

[jira] [Updated] (SPARK-23232) Mapping Dataset to a Java bean always set 1L to a long field

     [ https://issues.apache.org/jira/browse/SPARK-23232?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]

Hristo Angelov updated SPARK-23232:
-----------------------------------
    Description: 
I have the following streaming query: 
{code:java}
baseDataSet
        .groupBy(window(col(UTC_DATE_TIME), applicationProperties.getProperty("current_active_users_window_length") + " minutes", "5 seconds"))
        .agg(approx_count_distinct(col(INTERNAL_USER_ID), applicationProperties.getDoubleProperty("approximate_distinct_count_error_percentage")).as("value"))
        .filter(col("window.end").leq(current_timestamp()))
        .select(unix_timestamp(col("window.end")).as("timestamp"), col("value"))
        .writeStream()
        .trigger(Trigger.ProcessingTime(applicationProperties.getIntegerProperty("current_active_users_trigger_interval"), TimeUnit.SECONDS))
        .format(ActiveUsersSinkProvider.class.getCanonicalName())
        .outputMode(OutputMode.Update())
        .option("checkpointLocation", SystemProperties.APP_CHECKPOINT_DIR + "/current_active_users")
        .start();{code}
 

In the sink I'm trying to map the dataset to a Java bean with the following code:
{code:java}
data.as(Encoders.bean(LongTimeBased.class)).collectAsList()



{code}
where LongTimeBased is:
{code:java}
public class LongTimeBased { 
   private long timestamp; 
   private long value; 
 
   public long getTimestamp() {   
      return timestamp; 
   } 
   public void setTimestamp(long timestamp) { 
      this.timestamp = timestamp; 
   } 
   public long getValue() { 
      return value; 
   } 
   public void setValue(long value) { 
      this.value = value; 
   } 
}
{code}
 

So whatever data is aggregated the timestamp is correct but the value field is always 1. When I select the value field from every row, its value is correct:
{code:java}
for(Row row : data.collectAsList()) {
    Long value = row.getAs("value"); //correct value;
}
{code}
 

  was:
I have the following streaming query: 
{code:java}
baseDataSet
        .groupBy(window(col(UTC_DATE_TIME), applicationProperties.getProperty("current_active_users_window_length") + " minutes", "5 seconds"))
        .agg(approx_count_distinct(col(INTERNAL_USER_ID), applicationProperties.getDoubleProperty("approximate_distinct_count_error_percentage")).as("value"))
        .filter(col("window.end").leq(current_timestamp()))
        .select(unix_timestamp(col("window.end")).as("timestamp"), col("value"))
        .writeStream()
        .trigger(Trigger.ProcessingTime(applicationProperties.getIntegerProperty("current_active_users_trigger_interval"), TimeUnit.SECONDS))
        .format(ActiveUsersSinkProvider.class.getCanonicalName())
        .outputMode(OutputMode.Update())
        .option("checkpointLocation", SystemProperties.APP_CHECKPOINT_DIR + "/current_active_users")
        .start();{code}
 

In the sink I'm trying to map the dataset to a Java bean with the following code:
{code:java}
data.as(Encoders.bean(LongTimeBased.class)).collectAsList()



{code}
where LongTimeBased is:
{code:java}
public class LongTimeBased { 
   private long timestamp; 
   private long value; 
 
   public long getTimestamp() {   
      return timestamp; 
   } 
   public void setTimestamp(long timestamp) { 
      this.timestamp = timestamp; 
   } 
   public long getValue() { 
      return value; 
   } 
   public void setValue(long value) { 
      this.value = value; 
   } 
}
{code}
 

So whatever data is aggregated value field is always 1. When I select the value field from every row, its value is correct:
{code:java}
for(Row row : data.collectAsList()) {
    Long value = row.getAs("value"); //correct value;
}
{code}
 


> Mapping Dataset to a Java bean always set 1L to a long field
> ------------------------------------------------------------
>
>                 Key: SPARK-23232
>                 URL: https://issues.apache.org/jira/browse/SPARK-23232
>             Project: Spark
>          Issue Type: Bug
>          Components: Java API
>    Affects Versions: 2.2.0, 2.2.1
>            Reporter: Hristo Angelov
>            Priority: Critical
>
> I have the following streaming query: 
> {code:java}
> baseDataSet
>         .groupBy(window(col(UTC_DATE_TIME), applicationProperties.getProperty("current_active_users_window_length") + " minutes", "5 seconds"))
>         .agg(approx_count_distinct(col(INTERNAL_USER_ID), applicationProperties.getDoubleProperty("approximate_distinct_count_error_percentage")).as("value"))
>         .filter(col("window.end").leq(current_timestamp()))
>         .select(unix_timestamp(col("window.end")).as("timestamp"), col("value"))
>         .writeStream()
>         .trigger(Trigger.ProcessingTime(applicationProperties.getIntegerProperty("current_active_users_trigger_interval"), TimeUnit.SECONDS))
>         .format(ActiveUsersSinkProvider.class.getCanonicalName())
>         .outputMode(OutputMode.Update())
>         .option("checkpointLocation", SystemProperties.APP_CHECKPOINT_DIR + "/current_active_users")
>         .start();{code}
>  
> In the sink I'm trying to map the dataset to a Java bean with the following code:
> {code:java}
> data.as(Encoders.bean(LongTimeBased.class)).collectAsList()
> {code}
> where LongTimeBased is:
> {code:java}
> public class LongTimeBased { 
>    private long timestamp; 
>    private long value; 
>  
>    public long getTimestamp() {   
>       return timestamp; 
>    } 
>    public void setTimestamp(long timestamp) { 
>       this.timestamp = timestamp; 
>    } 
>    public long getValue() { 
>       return value; 
>    } 
>    public void setValue(long value) { 
>       this.value = value; 
>    } 
> }
> {code}
>  
> So whatever data is aggregated the timestamp is correct but the value field is always 1. When I select the value field from every row, its value is correct:
> {code:java}
> for(Row row : data.collectAsList()) {
>     Long value = row.getAs("value"); //correct value;
> }
> {code}
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@spark.apache.org
For additional commands, e-mail: issues-help@spark.apache.org