You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@spark.apache.org by "Hristo Angelov (JIRA)" <ji...@apache.org> on 2018/01/26 13:48:00 UTC
[jira] [Updated] (SPARK-23232) Mapping Dataset to a Java bean
always set 1L to a long field
[ https://issues.apache.org/jira/browse/SPARK-23232?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]
Hristo Angelov updated SPARK-23232:
-----------------------------------
Description:
I have the following streaming query:
{code:java}
baseDataSet
.groupBy(window(col(UTC_DATE_TIME), applicationProperties.getProperty("current_active_users_window_length") + " minutes", "5 seconds"))
.agg(approx_count_distinct(col(INTERNAL_USER_ID), applicationProperties.getDoubleProperty("approximate_distinct_count_error_percentage")).as("value"))
.filter(col("window.end").leq(current_timestamp()))
.select(unix_timestamp(col("window.end")).as("timestamp"), col("value"))
.writeStream()
.trigger(Trigger.ProcessingTime(applicationProperties.getIntegerProperty("current_active_users_trigger_interval"), TimeUnit.SECONDS))
.format(ActiveUsersSinkProvider.class.getCanonicalName())
.outputMode(OutputMode.Update())
.option("checkpointLocation", SystemProperties.APP_CHECKPOINT_DIR + "/current_active_users")
.start();{code}
In the sink I'm trying to map the dataset to a Java bean with the following code:
{code:java}
data.as(Encoders.bean(LongTimeBased.class)).collectAsList()
{code}
where LongTimeBased is:
{code:java}
public class LongTimeBased {
private long timestamp;
private long value;
public long getTimestamp() {
return timestamp;
}
public void setTimestamp(long timestamp) {
this.timestamp = timestamp;
}
public long getValue() {
return value;
}
public void setValue(long value) {
this.value = value;
}
}
{code}
So whatever data is aggregated the timestamp is correct but the value field is always 1. When I select the value field from every row, its value is correct:
{code:java}
for(Row row : data.collectAsList()) {
Long value = row.getAs("value"); //correct value;
}
{code}
was:
I have the following streaming query:
{code:java}
baseDataSet
.groupBy(window(col(UTC_DATE_TIME), applicationProperties.getProperty("current_active_users_window_length") + " minutes", "5 seconds"))
.agg(approx_count_distinct(col(INTERNAL_USER_ID), applicationProperties.getDoubleProperty("approximate_distinct_count_error_percentage")).as("value"))
.filter(col("window.end").leq(current_timestamp()))
.select(unix_timestamp(col("window.end")).as("timestamp"), col("value"))
.writeStream()
.trigger(Trigger.ProcessingTime(applicationProperties.getIntegerProperty("current_active_users_trigger_interval"), TimeUnit.SECONDS))
.format(ActiveUsersSinkProvider.class.getCanonicalName())
.outputMode(OutputMode.Update())
.option("checkpointLocation", SystemProperties.APP_CHECKPOINT_DIR + "/current_active_users")
.start();{code}
In the sink I'm trying to map the dataset to a Java bean with the following code:
{code:java}
data.as(Encoders.bean(LongTimeBased.class)).collectAsList()
{code}
where LongTimeBased is:
{code:java}
public class LongTimeBased {
private long timestamp;
private long value;
public long getTimestamp() {
return timestamp;
}
public void setTimestamp(long timestamp) {
this.timestamp = timestamp;
}
public long getValue() {
return value;
}
public void setValue(long value) {
this.value = value;
}
}
{code}
So whatever data is aggregated value field is always 1. When I select the value field from every row, its value is correct:
{code:java}
for(Row row : data.collectAsList()) {
Long value = row.getAs("value"); //correct value;
}
{code}
> Mapping Dataset to a Java bean always set 1L to a long field
> ------------------------------------------------------------
>
> Key: SPARK-23232
> URL: https://issues.apache.org/jira/browse/SPARK-23232
> Project: Spark
> Issue Type: Bug
> Components: Java API
> Affects Versions: 2.2.0, 2.2.1
> Reporter: Hristo Angelov
> Priority: Critical
>
> I have the following streaming query:
> {code:java}
> baseDataSet
> .groupBy(window(col(UTC_DATE_TIME), applicationProperties.getProperty("current_active_users_window_length") + " minutes", "5 seconds"))
> .agg(approx_count_distinct(col(INTERNAL_USER_ID), applicationProperties.getDoubleProperty("approximate_distinct_count_error_percentage")).as("value"))
> .filter(col("window.end").leq(current_timestamp()))
> .select(unix_timestamp(col("window.end")).as("timestamp"), col("value"))
> .writeStream()
> .trigger(Trigger.ProcessingTime(applicationProperties.getIntegerProperty("current_active_users_trigger_interval"), TimeUnit.SECONDS))
> .format(ActiveUsersSinkProvider.class.getCanonicalName())
> .outputMode(OutputMode.Update())
> .option("checkpointLocation", SystemProperties.APP_CHECKPOINT_DIR + "/current_active_users")
> .start();{code}
>
> In the sink I'm trying to map the dataset to a Java bean with the following code:
> {code:java}
> data.as(Encoders.bean(LongTimeBased.class)).collectAsList()
> {code}
> where LongTimeBased is:
> {code:java}
> public class LongTimeBased {
> private long timestamp;
> private long value;
>
> public long getTimestamp() {
> return timestamp;
> }
> public void setTimestamp(long timestamp) {
> this.timestamp = timestamp;
> }
> public long getValue() {
> return value;
> }
> public void setValue(long value) {
> this.value = value;
> }
> }
> {code}
>
> So whatever data is aggregated the timestamp is correct but the value field is always 1. When I select the value field from every row, its value is correct:
> {code:java}
> for(Row row : data.collectAsList()) {
> Long value = row.getAs("value"); //correct value;
> }
> {code}
>
--
This message was sent by Atlassian JIRA
(v7.6.3#76005)
---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@spark.apache.org
For additional commands, e-mail: issues-help@spark.apache.org