You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@spark.apache.org by "Hristo Angelov (JIRA)" <ji...@apache.org> on 2018/01/26 13:47:01 UTC

[jira] [Created] (SPARK-23232) Mapping Dataset to a Java bean always set 1L to a long field

Hristo Angelov created SPARK-23232:
--------------------------------------

             Summary: Mapping Dataset to a Java bean always set 1L to a long field
                 Key: SPARK-23232
                 URL: https://issues.apache.org/jira/browse/SPARK-23232
             Project: Spark
          Issue Type: Bug
          Components: Java API
    Affects Versions: 2.2.1, 2.2.0
            Reporter: Hristo Angelov


I have the following streaming query: 
{code:java}
baseDataSet
        .groupBy(window(col(UTC_DATE_TIME), applicationProperties.getProperty("current_active_users_window_length") + " minutes", "5 seconds"))
        .agg(approx_count_distinct(col(INTERNAL_USER_ID), applicationProperties.getDoubleProperty("approximate_distinct_count_error_percentage")).as("value"))
        .filter(col("window.end").leq(current_timestamp()))
        .select(unix_timestamp(col("window.end")).as("timestamp"), col("value"))
        .writeStream()
        .trigger(Trigger.ProcessingTime(applicationProperties.getIntegerProperty("current_active_users_trigger_interval"), TimeUnit.SECONDS))
        .format(ActiveUsersSinkProvider.class.getCanonicalName())
        .outputMode(OutputMode.Update())
        .option("checkpointLocation", SystemProperties.APP_CHECKPOINT_DIR + "/current_active_users")
        .start();{code}
 

In the sink I'm trying to map the dataset to a Java bean with the following code:
{code:java}
data.as(Encoders.bean(LongTimeBased.class)).collectAsList()



{code}
where LongTimeBased is:
{code:java}
public class LongTimeBased { 
   private long timestamp; 
   private long value; 
 
   public long getTimestamp() {   
      return timestamp; 
   } 
   public void setTimestamp(long timestamp) { 
      this.timestamp = timestamp; 
   } 
   public long getValue() { 
      return value; 
   } 
   public void setValue(long value) { 
      this.value = value; 
   } 
}
{code}
 

So whatever data is aggregated value field is always 1. When I select the value field from every row, its value is correct:
{code:java}
for(Row row : data.collectAsList()) {
    Long value = row.getAs("value"); //correct value;
}
{code}
 



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@spark.apache.org
For additional commands, e-mail: issues-help@spark.apache.org