You are viewing a plain text version of this content. The canonical link for it is here.
Posted to users@apex.apache.org by rohit garg <ro...@gmail.com> on 2017/05/13 05:25:22 UTC
[jira] [Commented] (APEXCORE-718) How to Use one Kafka input operator
and Feed to multiple operator
Did any one got a chance to look at it ..
--
---------------RohitGarg.
Re: [jira] [Commented] (APEXCORE-718) How to Use one Kafka input
operator and Feed to multiple operator
Posted by AJAY GUPTA <aj...@gmail.com>.
dag.addStream("MapData3", ipversionMapCreator.output ,
esOutput.input).setLocality(Locality.CONTAINER_LOCAL);
dag.addStream("MapData4", httpResponseCodeMapCreator.output ,
esOutput.input).setLocality(Locality.CONTAINER_LOCAL);
Your esOutput.input port is being connected to two upstream operators which
is not possible (reason : breaks idempotency). Your esOutput operator needs
to have 2 input ports to receive data from 2 input operators or use an
additional operator which unifies the two upstreams.
Thanks,
Ajay
Copy Pasting issue for reference :
public void populateDAG(DAG dag, Configuration conf)
{
KafkaSinglePortStringInputOperator kafkaReader =
dag.addOperator("MessageReader",
new KafkaSinglePortStringInputOperator());
IPVersionKeyValueCreator ipversionKeyValueCreator = dag.addOperator("
ipversionKeyValueCreator",new
IPVersionKeyValueCreator());
SumKeyVal<String, Long> ipversionSumOperator =
getSumKeyValOperator("ipversionSum",dag);
IPVersionMapCreator ipversionMapCreator =
dag.addOperator("ipversionMapCreator",new
IPVersionMapCreator());
//Operators for HTTP Response Code Count
HttpResponseCodeKeyValueCreator httpResponseCodeKeyValueCreator =
dag.addOperator("httpResponseCodeKeyValueCreator",new
HttpResponseCodeKeyValueCreator());
SumKeyVal<String, Long> httpResponseCodeSumOperator =
getSumKeyValOperator("httpResponseCodeSum",dag);
HttpResponseCodeMapCreator httpResponseCodeMapCreator =
dag.addOperator("httpResponseCodeMapCreator",new
HttpResponseCodeMapCreator());
ElasticSearchMapOutputOperator esOutput=
dag.addOperator("elasticSearchOperator",
new ElasticSearchMapOutputOperator());
try {
esOutput.setStore(createStore());
} catch (IOException e) {
e.printStackTrace();
}
esOutput.setIndexName("test_apex_kafka_es");
esOutput.setType("test_apex_es");
System.out.println("Wrting to ES");
dag.addStream("kafkaData3", kafkaReader.outputPort ,
ipversionKeyValueCreator.input, httpResponseCodeKeyValueCreato
r.input).setLocality(Locality.CONTAINER_LOCAL);
dag.addStream("JsonData3", ipversionKeyValueCreator.output ,
ipversionSumOperator.data).setLocality(Locality.CONTAINER_LOCAL);
dag.addStream("ConsumptionData3", ipversionSumOperator.sum ,
ipversionMapCreator.input).setLocality(Locality.CONTAINER_LOCAL);
dag.addStream("MapData3", ipversionMapCreator.output ,
esOutput.input).setLocality(Locality.CONTAINER_LOCAL);
//Streams for HTTP Response Code Count
//dag.addStream("kafkaData4", kafkaReader.outputPort ,
httpResponseCodeKeyValueCreator.input).setLocality(Locality.
CONTAINER_LOCAL);
dag.addStream("JsonData4", httpResponseCodeKeyValueCreator.output ,
httpResponseCodeSumOperator.data).setLocality(Locality.CONTAINER_LOCAL);
dag.addStream("ConsumptionData4", httpResponseCodeSumOperator.sum ,
httpResponseCodeMapCreator.input).setLocality(Locality.CONTAINER_LOCAL);
dag.addStream("MapData4", httpResponseCodeMapCreator.output ,
esOutput.input).setLocality(Locality.CONTAINER_LOCAL);
}
--------------==============================================
==========================================================
java.lang.IllegalArgumentException: Port input already connected to stream
LogicalPlan.StreamMeta[id=MapData3]
at com.datatorrent.stram.plan.logical.LogicalPlan$
StreamMeta.addSink(LogicalPlan.java:548)
at com.datatorrent.stram.plan.logical.LogicalPlan.addStream(
LogicalPlan.java:1429)
at com.datatorrent.stram.plan.logical.LogicalPlan.addStream(
LogicalPlan.java:1480)
at com.datatorrent.stram.plan.logical.LogicalPlan.addStream(
LogicalPlan.java:125)
at org.jio.media.Application.populateDAG(Application.java:99)
On Sat, May 13, 2017 at 10:55 AM, rohit garg <ro...@gmail.com> wrote:
>
> Did any one got a chance to look at it ..
>
> --
>
>
> ---------------RohitGarg.
>
>
>
>
>