You are viewing a plain text version of this content. The canonical link for it is here.
Posted to users@apex.apache.org by rohit garg <ro...@gmail.com> on 2017/05/13 05:25:22 UTC

[jira] [Commented] (APEXCORE-718) How to Use one Kafka input operator and Feed to multiple operator

Did any one got a chance to look at it ..

-- 


   ---------------RohitGarg.

Re: [jira] [Commented] (APEXCORE-718) How to Use one Kafka input operator and Feed to multiple operator

Posted by AJAY GUPTA <aj...@gmail.com>.
dag.addStream("MapData3", ipversionMapCreator.output ,
esOutput.input).setLocality(Locality.CONTAINER_LOCAL);
 dag.addStream("MapData4", httpResponseCodeMapCreator.output ,
esOutput.input).setLocality(Locality.CONTAINER_LOCAL);

Your esOutput.input port is being connected to two upstream operators which
is not possible (reason : breaks idempotency). Your esOutput operator needs
to have 2 input ports to receive data from 2 input operators or use an
additional operator which unifies the two upstreams.


Thanks,
Ajay

Copy Pasting issue for reference :

public void populateDAG(DAG dag, Configuration conf)
    {
        KafkaSinglePortStringInputOperator kafkaReader =
dag.addOperator("MessageReader",
new KafkaSinglePortStringInputOperator());

 IPVersionKeyValueCreator ipversionKeyValueCreator = dag.addOperator("
ipversionKeyValueCreator",new
  IPVersionKeyValueCreator());
        SumKeyVal<String, Long> ipversionSumOperator =
getSumKeyValOperator("ipversionSum",dag);
        IPVersionMapCreator ipversionMapCreator =
dag.addOperator("ipversionMapCreator",new
IPVersionMapCreator());

        //Operators for HTTP Response Code Count
        HttpResponseCodeKeyValueCreator httpResponseCodeKeyValueCreator =
dag.addOperator("httpResponseCodeKeyValueCreator",new
HttpResponseCodeKeyValueCreator());
        SumKeyVal<String, Long> httpResponseCodeSumOperator =
getSumKeyValOperator("httpResponseCodeSum",dag);
        HttpResponseCodeMapCreator httpResponseCodeMapCreator =
dag.addOperator("httpResponseCodeMapCreator",new
HttpResponseCodeMapCreator());

        ElasticSearchMapOutputOperator esOutput=
dag.addOperator("elasticSearchOperator",
new ElasticSearchMapOutputOperator());

        try {
          esOutput.setStore(createStore());
        } catch (IOException e) {
          e.printStackTrace();
        }

        esOutput.setIndexName("test_apex_kafka_es");
        esOutput.setType("test_apex_es");
        System.out.println("Wrting to ES");

        dag.addStream("kafkaData3", kafkaReader.outputPort ,
ipversionKeyValueCreator.input, httpResponseCodeKeyValueCreato
r.input).setLocality(Locality.CONTAINER_LOCAL);
        dag.addStream("JsonData3", ipversionKeyValueCreator.output ,
ipversionSumOperator.data).setLocality(Locality.CONTAINER_LOCAL);
        dag.addStream("ConsumptionData3", ipversionSumOperator.sum ,
ipversionMapCreator.input).setLocality(Locality.CONTAINER_LOCAL);
        dag.addStream("MapData3", ipversionMapCreator.output ,
esOutput.input).setLocality(Locality.CONTAINER_LOCAL);

        //Streams for HTTP Response Code Count
        //dag.addStream("kafkaData4", kafkaReader.outputPort ,
httpResponseCodeKeyValueCreator.input).setLocality(Locality.
CONTAINER_LOCAL);
        dag.addStream("JsonData4", httpResponseCodeKeyValueCreator.output ,
httpResponseCodeSumOperator.data).setLocality(Locality.CONTAINER_LOCAL);
        dag.addStream("ConsumptionData4", httpResponseCodeSumOperator.sum ,
httpResponseCodeMapCreator.input).setLocality(Locality.CONTAINER_LOCAL);
        dag.addStream("MapData4", httpResponseCodeMapCreator.output ,
esOutput.input).setLocality(Locality.CONTAINER_LOCAL);
    }

--------------==============================================
==========================================================


java.lang.IllegalArgumentException: Port input already connected to stream
LogicalPlan.StreamMeta[id=MapData3]
        at com.datatorrent.stram.plan.logical.LogicalPlan$
StreamMeta.addSink(LogicalPlan.java:548)
        at com.datatorrent.stram.plan.logical.LogicalPlan.addStream(
LogicalPlan.java:1429)
        at com.datatorrent.stram.plan.logical.LogicalPlan.addStream(
LogicalPlan.java:1480)
        at com.datatorrent.stram.plan.logical.LogicalPlan.addStream(
LogicalPlan.java:125)
        at org.jio.media.Application.populateDAG(Application.java:99)

On Sat, May 13, 2017 at 10:55 AM, rohit garg <ro...@gmail.com> wrote:

>
> Did any one got a chance to look at it ..
>
> --
>
>
>    ---------------RohitGarg.
>
>
>
>
>