You are viewing a plain text version of this content. The canonical link for it is here.
Posted to jira@kafka.apache.org by "Amit Chauhan (Jira)" <ji...@apache.org> on 2020/05/27 07:14:00 UTC

[jira] [Created] (KAFKA-10049) KTable-KTable Foreign

Amit Chauhan created KAFKA-10049:
------------------------------------

             Summary: KTable-KTable Foreign 
                 Key: KAFKA-10049
                 URL: https://issues.apache.org/jira/browse/KAFKA-10049
             Project: Kafka
          Issue Type: Bug
          Components: streams
    Affects Versions: 2.5.0
            Reporter: Amit Chauhan


 

{code}

 

{{public static void main(String[] args) \{

     Properties props = new Properties();
     props.put(StreamsConfig.APPLICATION_ID_CONFIG, "my-stream-processing-application-2");
     props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
     props.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass());
     props.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, new JSONSerdeComp<>().getClass());
     props.put(StreamsConfig.STATE_DIR_CONFIG, "C:\\temp");
     props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");

     StreamsBuilder builder = new StreamsBuilder();
     KTable<String, OrderObject> ordersTable = builder.<String, OrderObject>table(TOPIC_Agora);
     KTable<String, StockMarketData> stockTable = builder.<String, StockMarketData>table(TOPIC_Stock_Data);

     KTable<String, EnrichedOrder> enriched = ordersTable.leftJoin(stockTable, OrderObject:: getSymbol, new ValueJoiner<OrderObject, StockMarketData, EnrichedOrder>() {

            @Override
            public EnrichedOrder apply(OrderObject order, StockMarketData stock) {
                EnrichedOrder enOrder = EnrichedOrder.builder()
                        .orderId(order.getOrderId())
                        .execPrice(order.getPrice())
                        .symbol(order.getSymbol())
                        .quanity(order.getQuanity())
                        .side(order.getSide())
                        .filledQty(order.getFilledQty())
                        .leaveQty(order.getLeaveQty())
                        .index(order.getIndex())
                        .vWaprelative(order.getVWaprelative())
                        .stockAsk(stock!=null?stock.getAsk().doubleValue():0.0)
                        .stockBid(stock!=null?stock.getBid().doubleValue():0.0)
                        .stockLast(stock!=null?stock.getLast().doubleValue():0.0)
                        .stockClose(stock!=null?stock.getClose().doubleValue():0.0)
                        .build();
                return enOrder;
            }
        } , Materialized.with(Serdes.String(), new JSONSerdeComp<>()));

     enriched.toStream().foreach(new ForeachAction<String, EnrichedOrder>() \{
         @Override
        public void apply(String arg0, EnrichedOrder arg1) {

             logger.info(String.format("key = %s, value = %s", arg0, arg1));
        }
    });

     KafkaStreams streams = new KafkaStreams(builder.build(), props);
     streams.start();

     Runtime.getRuntime().addShutdownHook(new Thread(() -> streams.close()));
}}}

{{}}

 

{{<dependency>
        <groupId>org.apache.kafka</groupId>
        <artifactId>kafka-clients</artifactId>
        <version>2.5.0</version>
    </dependency>
    <dependency>
        <groupId>org.apache.kafka</groupId>
        <artifactId>kafka-streams</artifactId>
        <version>2.5.0</version>
    </dependency>}}{{}}

{code}



--
This message was sent by Atlassian Jira
(v8.3.4#803005)