You are viewing a plain text version of this content. The canonical link for it is here.
Posted to jira@kafka.apache.org by "Amit Chauhan (Jira)" <ji...@apache.org> on 2020/05/27 07:14:00 UTC
[jira] [Created] (KAFKA-10049) KTable-KTable Foreign
Amit Chauhan created KAFKA-10049:
------------------------------------
Summary: KTable-KTable Foreign
Key: KAFKA-10049
URL: https://issues.apache.org/jira/browse/KAFKA-10049
Project: Kafka
Issue Type: Bug
Components: streams
Affects Versions: 2.5.0
Reporter: Amit Chauhan
{code}
{{public static void main(String[] args) \{
Properties props = new Properties();
props.put(StreamsConfig.APPLICATION_ID_CONFIG, "my-stream-processing-application-2");
props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
props.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass());
props.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, new JSONSerdeComp<>().getClass());
props.put(StreamsConfig.STATE_DIR_CONFIG, "C:\\temp");
props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
StreamsBuilder builder = new StreamsBuilder();
KTable<String, OrderObject> ordersTable = builder.<String, OrderObject>table(TOPIC_Agora);
KTable<String, StockMarketData> stockTable = builder.<String, StockMarketData>table(TOPIC_Stock_Data);
KTable<String, EnrichedOrder> enriched = ordersTable.leftJoin(stockTable, OrderObject:: getSymbol, new ValueJoiner<OrderObject, StockMarketData, EnrichedOrder>() {
@Override
public EnrichedOrder apply(OrderObject order, StockMarketData stock) {
EnrichedOrder enOrder = EnrichedOrder.builder()
.orderId(order.getOrderId())
.execPrice(order.getPrice())
.symbol(order.getSymbol())
.quanity(order.getQuanity())
.side(order.getSide())
.filledQty(order.getFilledQty())
.leaveQty(order.getLeaveQty())
.index(order.getIndex())
.vWaprelative(order.getVWaprelative())
.stockAsk(stock!=null?stock.getAsk().doubleValue():0.0)
.stockBid(stock!=null?stock.getBid().doubleValue():0.0)
.stockLast(stock!=null?stock.getLast().doubleValue():0.0)
.stockClose(stock!=null?stock.getClose().doubleValue():0.0)
.build();
return enOrder;
}
} , Materialized.with(Serdes.String(), new JSONSerdeComp<>()));
enriched.toStream().foreach(new ForeachAction<String, EnrichedOrder>() \{
@Override
public void apply(String arg0, EnrichedOrder arg1) {
logger.info(String.format("key = %s, value = %s", arg0, arg1));
}
});
KafkaStreams streams = new KafkaStreams(builder.build(), props);
streams.start();
Runtime.getRuntime().addShutdownHook(new Thread(() -> streams.close()));
}}}
{{}}
{{<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-clients</artifactId>
<version>2.5.0</version>
</dependency>
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-streams</artifactId>
<version>2.5.0</version>
</dependency>}}{{}}
{code}
--
This message was sent by Atlassian Jira
(v8.3.4#803005)