You are viewing a plain text version of this content. The canonical link for it is here.
Posted to jira@kafka.apache.org by "Amit Chauhan (Jira)" <ji...@apache.org> on 2020/05/27 07:19:00 UTC
[jira] [Updated] (KAFKA-10049) KTable-KTable Foreign Key join
throwing Serialization Exception
[ https://issues.apache.org/jira/browse/KAFKA-10049?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]
Amit Chauhan updated KAFKA-10049:
---------------------------------
Description:
I want to make use of _KTable-KTable_ join feature released in *_2.5.0_* but facing issue while running the code.
{code:java}
public static void main(String[] args) {
Properties props = new Properties();
props.put(StreamsConfig.APPLICATION_ID_CONFIG, "my-stream-processing-application-2");
props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
props.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass());
props.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, new JSONSerdeComp<>().getClass());
props.put(StreamsConfig.STATE_DIR_CONFIG, "C:\\temp");
props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
StreamsBuilder builder = new StreamsBuilder();
KTable<String, OrderObject> ordersTable = builder.<String, OrderObject>table(TOPIC_Agora);
KTable<String, StockMarketData> stockTable = builder.<String, StockMarketData>table(TOPIC_Stock_Data);
KTable<String, EnrichedOrder> enriched = ordersTable.leftJoin(stockTable, OrderObject:: getSymbol, new ValueJoiner<OrderObject, StockMarketData, EnrichedOrder>() {
@Override
public EnrichedOrder apply(OrderObject order, StockMarketData stock) {
EnrichedOrder enOrder = EnrichedOrder.builder()
.orderId(order.getOrderId())
.execPrice(order.getPrice())
.symbol(order.getSymbol())
.quanity(order.getQuanity())
.side(order.getSide())
.filledQty(order.getFilledQty())
.leaveQty(order.getLeaveQty())
.index(order.getIndex())
.vWaprelative(order.getVWaprelative())
.stockAsk(stock!=null?stock.getAsk().doubleValue():0.0)
.stockBid(stock!=null?stock.getBid().doubleValue():0.0)
.stockLast(stock!=null?stock.getLast().doubleValue():0.0)
.stockClose(stock!=null?stock.getClose().doubleValue():0.0)
.build();
return enOrder;
}
} , Materialized.with(Serdes.String(), new JSONSerdeComp<>()));
enriched.toStream().foreach(new ForeachAction<String, EnrichedOrder>() \{
@Override
public void apply(String arg0, EnrichedOrder arg1) {
logger.info(String.format("key = %s, value = %s", arg0, arg1));
}
});
KafkaStreams streams = new KafkaStreams(builder.build(), props);
streams.start();
Runtime.getRuntime().addShutdownHook(new Thread(() -> streams.close()));
}}}
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-clients</artifactId>
<version>2.5.0</version>
</dependency>
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-streams</artifactId>
<version>2.5.0</version>
</dependency>}}
{code}
was:
{code}
{{public static void main(String[] args) \{
Properties props = new Properties();
props.put(StreamsConfig.APPLICATION_ID_CONFIG, "my-stream-processing-application-2");
props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
props.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass());
props.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, new JSONSerdeComp<>().getClass());
props.put(StreamsConfig.STATE_DIR_CONFIG, "C:\\temp");
props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
StreamsBuilder builder = new StreamsBuilder();
KTable<String, OrderObject> ordersTable = builder.<String, OrderObject>table(TOPIC_Agora);
KTable<String, StockMarketData> stockTable = builder.<String, StockMarketData>table(TOPIC_Stock_Data);
KTable<String, EnrichedOrder> enriched = ordersTable.leftJoin(stockTable, OrderObject:: getSymbol, new ValueJoiner<OrderObject, StockMarketData, EnrichedOrder>() {
@Override
public EnrichedOrder apply(OrderObject order, StockMarketData stock) {
EnrichedOrder enOrder = EnrichedOrder.builder()
.orderId(order.getOrderId())
.execPrice(order.getPrice())
.symbol(order.getSymbol())
.quanity(order.getQuanity())
.side(order.getSide())
.filledQty(order.getFilledQty())
.leaveQty(order.getLeaveQty())
.index(order.getIndex())
.vWaprelative(order.getVWaprelative())
.stockAsk(stock!=null?stock.getAsk().doubleValue():0.0)
.stockBid(stock!=null?stock.getBid().doubleValue():0.0)
.stockLast(stock!=null?stock.getLast().doubleValue():0.0)
.stockClose(stock!=null?stock.getClose().doubleValue():0.0)
.build();
return enOrder;
}
} , Materialized.with(Serdes.String(), new JSONSerdeComp<>()));
enriched.toStream().foreach(new ForeachAction<String, EnrichedOrder>() \{
@Override
public void apply(String arg0, EnrichedOrder arg1) {
logger.info(String.format("key = %s, value = %s", arg0, arg1));
}
});
KafkaStreams streams = new KafkaStreams(builder.build(), props);
streams.start();
Runtime.getRuntime().addShutdownHook(new Thread(() -> streams.close()));
}}}
{{}}
{{<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-clients</artifactId>
<version>2.5.0</version>
</dependency>
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-streams</artifactId>
<version>2.5.0</version>
</dependency>}}{{}}
{code}
Summary: KTable-KTable Foreign Key join throwing Serialization Exception (was: KTable-KTable Foreign )
> KTable-KTable Foreign Key join throwing Serialization Exception
> ----------------------------------------------------------------
>
> Key: KAFKA-10049
> URL: https://issues.apache.org/jira/browse/KAFKA-10049
> Project: Kafka
> Issue Type: Bug
> Components: streams
> Affects Versions: 2.5.0
> Reporter: Amit Chauhan
> Priority: Blocker
>
> I want to make use of _KTable-KTable_ join feature released in *_2.5.0_* but facing issue while running the code.
> {code:java}
>
> public static void main(String[] args) {
> Properties props = new Properties();
> props.put(StreamsConfig.APPLICATION_ID_CONFIG, "my-stream-processing-application-2");
> props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
> props.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass());
> props.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, new JSONSerdeComp<>().getClass());
> props.put(StreamsConfig.STATE_DIR_CONFIG, "C:\\temp");
> props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
> StreamsBuilder builder = new StreamsBuilder();
> KTable<String, OrderObject> ordersTable = builder.<String, OrderObject>table(TOPIC_Agora);
> KTable<String, StockMarketData> stockTable = builder.<String, StockMarketData>table(TOPIC_Stock_Data);
> KTable<String, EnrichedOrder> enriched = ordersTable.leftJoin(stockTable, OrderObject:: getSymbol, new ValueJoiner<OrderObject, StockMarketData, EnrichedOrder>() {
> @Override
> public EnrichedOrder apply(OrderObject order, StockMarketData stock) {
> EnrichedOrder enOrder = EnrichedOrder.builder()
> .orderId(order.getOrderId())
> .execPrice(order.getPrice())
> .symbol(order.getSymbol())
> .quanity(order.getQuanity())
> .side(order.getSide())
> .filledQty(order.getFilledQty())
> .leaveQty(order.getLeaveQty())
> .index(order.getIndex())
> .vWaprelative(order.getVWaprelative())
> .stockAsk(stock!=null?stock.getAsk().doubleValue():0.0)
> .stockBid(stock!=null?stock.getBid().doubleValue():0.0)
> .stockLast(stock!=null?stock.getLast().doubleValue():0.0)
> .stockClose(stock!=null?stock.getClose().doubleValue():0.0)
> .build();
> return enOrder;
> }
> } , Materialized.with(Serdes.String(), new JSONSerdeComp<>()));
> enriched.toStream().foreach(new ForeachAction<String, EnrichedOrder>() \{
> @Override
> public void apply(String arg0, EnrichedOrder arg1) {
> logger.info(String.format("key = %s, value = %s", arg0, arg1));
> }
> });
> KafkaStreams streams = new KafkaStreams(builder.build(), props);
> streams.start();
> Runtime.getRuntime().addShutdownHook(new Thread(() -> streams.close()));
> }}}
>
> <dependency>
> <groupId>org.apache.kafka</groupId>
> <artifactId>kafka-clients</artifactId>
> <version>2.5.0</version>
> </dependency>
> <dependency>
> <groupId>org.apache.kafka</groupId>
> <artifactId>kafka-streams</artifactId>
> <version>2.5.0</version>
> </dependency>}}
> {code}
--
This message was sent by Atlassian Jira
(v8.3.4#803005)