You are viewing a plain text version of this content. The canonical link for it is here.
Posted to users@apex.apache.org by Jim <ji...@facility.supplies> on 2016/08/26 20:01:20 UTC

HDHT question - looking for the datatorrent gurus!

Good afternoon,

I have an apex application where I may receive edi transactions, but sometimes they arrive out of order and I want to hold any out of sequence transactions till the correct time in the flow to process them.

For example for a standard order, we will receive from the remote vendor:


1.)    General Acknowledgement

2.)    Detailed Acknowledgement

3.)    Ship Notification

4.)    Invoice

They are supposed to be sent and received in that order.

However sometimes vendors systems have problems, etc. so they send the all of these at the same time, and then we can receive them out of sequence.  Data packets for these are very small, say from 1 to 512 bytes, and the only time they will be out of sequence, we will receive them very closely together.

I am trying to think of the best way to do this in my datatorrent / Hadoop / yarn facilities, instead of creating a datatable in postgreSQl and using that.

Can I create a flow that works like this (I am not sure if this makes sense, or is the best way to solve my problem, while keeping state, etc. maintained for all the operators):


1.)    In the inbound transaction router, check the hdht store for the order number, if it doesn't exist, this means it is a new order, if the transaction trying to process is the general acknowledgment, emit the data to the general acknowledgement operator; if it is not - store the transaction data into the correct bucket identifying the transaction is it for, as well as the next step to be the general acknowledgement in HDHT by order number.

2.)    Say the next transaction is the ship notification, in the router, we would check the HDHT store, see this is not the next expected transaction (say it is supposed to be the detail acknowledgement), so we would just post the data for the ship notification into HDHT the store and say we are done.

3.)    Say we now receive the detailed acknowledgement for an order whose next step IS the detailed acknowledgement, we would see this is the correct next transaction, emit it to the detailed acknowledgement operator, and update the HDHT store to show that the next transaction should be the ship notification.  NOTE:  we can't emit the ship notification yet, till we have confirmed that the detailed ackkowledgment has been completed.

4.)    In each of the 4 transaction operators at the end of the processing, we would update the HDHT store to show the next expected step, and if we already received data for the next expected step pull it from the HDHT store, and write the transaction into our SQS queue which is the input into the inbound transaction router at the beginning of the application, so it processes through the system.

I believe HDHT can be used to pass data throughout an entire application, and is not limited to just a per operator basis, correct?

Any comments / feedback?

Thanks,

Jim

Re: HDHT question - looking for the datatorrent gurus!

Posted by Thomas Weise <th...@gmail.com>.
HDHT is for *embedded* state management, fully encapsulated by the
operator. It cannot be used like a central database.

Thanks,
Thomas


On Thu, Sep 1, 2016 at 9:49 AM, Tushar Gosavi <tu...@datatorrent.com>
wrote:

> Hi Jim,
>
> Currently HDHT is accessible only to single operator in a DAG. Single
> HDHT store can not be managed by two different operator at a time
> which could cause metadata corruption. Theoretically HDHT bucket could
> be read from multiple operators, but only one writer is allowed.
>
> In your case a stage in transaction is processed completely by
> different operator and then only next stage can start. It could still
> be achieved by using a single operator which manages HDHT state, and
> having a loop in DAG to send completed transaction ids to sequencer.
>
> - Sequence operator will emit transaction to transaction processing
> operator.
> - If it receives an out of order transaction it will note it down in HDHT.
> - The processing operator will send completed transaction id on a port
> which is connected back to sequence operator.
> - On receiving data on this loopback port, sequence operator will
> update HDHT and search for next transaction in order, which could be
> stored in HDHT and will emit to next processing operator.
>
> - Tushar.
>
>
> On Sat, Aug 27, 2016 at 1:31 AM, Jim <ji...@facility.supplies> wrote:
> > Good afternoon,
> >
> >
> >
> > I have an apex application where I may receive edi transactions, but
> > sometimes they arrive out of order and I want to hold any out of sequence
> > transactions till the correct time in the flow to process them.
> >
> >
> >
> > For example for a standard order, we will receive from the remote vendor:
> >
> >
> >
> > 1.)    General Acknowledgement
> >
> > 2.)    Detailed Acknowledgement
> >
> > 3.)    Ship Notification
> >
> > 4.)    Invoice
> >
> >
> >
> > They are supposed to be sent and received in that order.
> >
> >
> >
> > However sometimes vendors systems have problems, etc. so they send the
> all
> > of these at the same time, and then we can receive them out of sequence.
> > Data packets for these are very small, say from 1 to 512 bytes, and the
> only
> > time they will be out of sequence, we will receive them very closely
> > together.
> >
> >
> >
> > I am trying to think of the best way to do this in my datatorrent /
> Hadoop /
> > yarn facilities, instead of creating a datatable in postgreSQl and using
> > that.
> >
> >
> >
> > Can I create a flow that works like this (I am not sure if this makes
> sense,
> > or is the best way to solve my problem, while keeping state, etc.
> maintained
> > for all the operators):
> >
> >
> >
> > 1.)    In the inbound transaction router, check the hdht store for the
> order
> > number, if it doesn’t exist, this means it is a new order, if the
> > transaction trying to process is the general acknowledgment, emit the
> data
> > to the general acknowledgement operator; if it is not – store the
> > transaction data into the correct bucket identifying the transaction is
> it
> > for, as well as the next step to be the general acknowledgement in HDHT
> by
> > order number.
> >
> > 2.)    Say the next transaction is the ship notification, in the router,
> we
> > would check the HDHT store, see this is not the next expected transaction
> > (say it is supposed to be the detail acknowledgement), so we would just
> post
> > the data for the ship notification into HDHT the store and say we are
> done.
> >
> > 3.)    Say we now receive the detailed acknowledgement for an order whose
> > next step IS the detailed acknowledgement, we would see this is the
> correct
> > next transaction, emit it to the detailed acknowledgement operator, and
> > update the HDHT store to show that the next transaction should be the
> ship
> > notification.  NOTE:  we can’t emit the ship notification yet, till we
> have
> > confirmed that the detailed ackkowledgment has been completed.
> >
> > 4.)    In each of the 4 transaction operators at the end of the
> processing,
> > we would update the HDHT store to show the next expected step, and if we
> > already received data for the next expected step pull it from the HDHT
> > store, and write the transaction into our SQS queue which is the input
> into
> > the inbound transaction router at the beginning of the application, so it
> > processes through the system.
> >
> >
> >
> > I believe HDHT can be used to pass data throughout an entire application,
> > and is not limited to just a per operator basis, correct?
> >
> >
> >
> > Any comments / feedback?
> >
> >
> >
> > Thanks,
> >
> >
> >
> > Jim
>

Re: HDHT question - looking for the datatorrent gurus!

Posted by Thomas Weise <th...@gmail.com>.
Hi Jim,

It  is a generic pass through that will be recognized by the engine as a
valid loop in the graph. Send the output of your operator to the delay
operator and then the output of the delay operator back to the upstream
operator to form the loop.

Thomas

On Sat, Sep 3, 2016 at 9:53 AM, Jim <ji...@facility.supplies> wrote:

> Hi Thomas,
>
>
>
> I saw that example, and wasn’t quite following it.
>
>
>
> Is the DelayOperator, just a generic pass-through, that we put our output
> to, and then read from in the interative operator we want to go back to so
> that it handles the correct windowing, etc?
>
>
>
> Jim
>
>
>
> *From:* Thomas Weise [mailto:thomas.weise@gmail.com]
> *Sent:* Saturday, September 3, 2016 11:47 AM
>
> *To:* users@apex.apache.org
> *Subject:* Re: HDHT question - looking for the datatorrent gurus!
>
>
>
> Jim,
>
>
>
> You need to use the delay operator for iterative processing. Here is an
> example:
>
>
>
> https://github.com/apache/apex-malhar/blob/master/demos/
> iteration/src/main/java/com/datatorrent/demos/iteration/Application.java
>
>
>
> Thomas
>
>
>
> On Sat, Sep 3, 2016 at 9:05 AM, Jim <ji...@facility.supplies> wrote:
>
> Tushar,
>
> I am trying to implement what you described, and I get a validation
> error:  Loops In Graph, and can't seem to find any reference to this.
>
> Below I have pasted my Application.java file; what could be causing this
> error:
>
> java.lang.RuntimeException: Error creating local cluster
>
>         at com.datatorrent.stram.LocalModeImpl.getController(
> LocalModeImpl.java:78)
>         at supplies.facility.edi.sellars.ApplicationTest.testApplication(
> ApplicationTest.java:30)
>         at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
>         at sun.reflect.NativeMethodAccessorImpl.invoke(
> NativeMethodAccessorImpl.java:62)
>         at sun.reflect.DelegatingMethodAccessorImpl.invoke(
> DelegatingMethodAccessorImpl.java:43)
>         at java.lang.reflect.Method.invoke(Method.java:498)
>         at org.junit.runners.model.FrameworkMethod$1.runReflectiveCall(
> FrameworkMethod.java:44)
>         at org.junit.internal.runners.model.ReflectiveCallable.run(
> ReflectiveCallable.java:15)
>         at org.junit.runners.model.FrameworkMethod.invokeExplosively(
> FrameworkMethod.java:41)
>         at org.junit.internal.runners.statements.InvokeMethod.
> evaluate(InvokeMethod.java:20)
>         at org.junit.runners.BlockJUnit4ClassRunner.runNotIgnored(
> BlockJUnit4ClassRunner.java:79)
>         at org.junit.runners.BlockJUnit4ClassRunner.runChild(
> BlockJUnit4ClassRunner.java:71)
>         at org.junit.runners.BlockJUnit4ClassRunner.runChild(
> BlockJUnit4ClassRunner.java:49)
>         at org.junit.runners.ParentRunner$3.run(ParentRunner.java:193)
>         at org.junit.runners.ParentRunner$1.schedule(ParentRunner.java:52)
>         at org.junit.runners.ParentRunner.runChildren(
> ParentRunner.java:191)
>         at org.junit.runners.ParentRunner.access$000(ParentRunner.java:42)
>         at org.junit.runners.ParentRunner$2.evaluate(
> ParentRunner.java:184)
>         at org.junit.runners.ParentRunner.run(ParentRunner.java:236)
>         at org.junit.runner.JUnitCore.run(JUnitCore.java:157)
>         at com.intellij.junit4.JUnit4IdeaTestRunner.startRunnerWithArgs(
> JUnit4IdeaTestRunner.java:117)
>         at com.intellij.junit4.JUnit4IdeaTestRunner.startRunnerWithArgs(
> JUnit4IdeaTestRunner.java:42)
>         at com.intellij.rt.execution.junit.JUnitStarter.
> prepareStreamsAndStart(JUnitStarter.java:262)
>         at com.intellij.rt.execution.junit.JUnitStarter.main(
> JUnitStarter.java:84)
> Caused by: javax.validation.ValidationException: Loops in graph:
> [[OperatorMeta{name=operator997, operator=FunctionalAcknowledgmentOperator{name=null},
> attributes={Attribute{defaultValue=1024, name=com.datatorrent.api.
> Context.OperatorContext.MEMORY_MB, codec=com.datatorrent.api.
> StringCodec$Integer2String@7b7fdc8}=192}}, OperatorMeta{name=operator856,
> operator=ShipNotificationOperator{name=null}, attributes={Attribute{defaultValue=1024,
> name=com.datatorrent.api.Context.OperatorContext.MEMORY_MB,
> codec=com.datatorrent.api.StringCodec$Integer2String@7b7fdc8}=192}},
> OperatorMeta{name=operator855, operator=POAcknowledgmentOperator{name=null},
> attributes={Attribute{defaultValue=1024, name=com.datatorrent.api.
> Context.OperatorContext.MEMORY_MB, codec=com.datatorrent.api.
> StringCodec$Integer2String@7b7fdc8}=192}}, OperatorMeta{name=ediRouter,
> operator=EDIRoutingOperator{name=null}, attributes={Attribute{defaultValue=1024,
> name=com.datatorrent.api.Context.OperatorContext.MEMORY_MB,
> codec=com.datatorrent.api.StringCodec$Integer2String@7b7fdc8}=512}},
> OperatorMeta{name=operator810, operator=InvoiceOperator{name=null},
> attributes={Attribute{defaultValue=1024, name=com.datatorrent.api.
> Context.OperatorContext.MEMORY_MB, codec=com.datatorrent.api.
> StringCodec$Integer2String@7b7fdc8}=192}}]]
>         at com.datatorrent.stram.plan.logical.LogicalPlan.validate(
> LogicalPlan.java:1775)
>         at com.datatorrent.stram.StramLocalCluster.<init>(
> StramLocalCluster.java:278)
>         at com.datatorrent.stram.LocalModeImpl.getController(
> LocalModeImpl.java:76)
>         ... 23 more
>
> ============================================  Application.java starts
> here ============================================================
>
> /**
>  * Put your copyright and license info here.
>  */
> package supplies.facility.edi.sellars;
>
> import com.datatorrent.api.DAG;
> import com.datatorrent.api.DAG.Locality;
> import com.datatorrent.api.StreamingApplication;
> import com.datatorrent.api.annotation.ApplicationAnnotation;
> import com.datatorrent.contrib.kinesis.AbstractKinesisInputOperator;
> import com.datatorrent.contrib.kinesis.KinesisConsumer;
> import com.datatorrent.contrib.kinesis.KinesisStringInputOperator;
> import com.datatorrent.contrib.kinesis.ShardManager;
> import com.datatorrent.lib.db.jdbc.JdbcStore;
> import com.datatorrent.netlet.util.DTThrowable;
> import org.apache.hadoop.conf.Configuration;
> import org.slf4j.Logger;
> import org.slf4j.LoggerFactory;
> import supplies.facility.apex.contrib.elasticsearch.
> ElasticSearchMapOutputOperator;
> import supplies.facility.edi.helpers.EdiSmtpOutputOperator;
> import supplies.facility.edi.helpers.KinesisEdiTransactionSetInputO
> perator;
> import supplies.facility.edi.helpers.SqsTransactionSetInputOperator;
> import supplies.facility.edi.helpers.StatefulShardManager;
>
> import java.sql.PreparedStatement;
> import java.sql.ResultSet;
>
> @ApplicationAnnotation(name="FsEdiSellars")
> public class Application implements StreamingApplication {
>   private final Locality locality = null;
>
>   private static final Logger LOG = LoggerFactory.getLogger(
> Application.class);
>
>   private JdbcStore jdbcStore;
>
>   @Override
>   public void populateDAG(DAG dag, Configuration conf) {
>
>     try {
>       //Read from SQS FS_EDI_SELLARS
>       //SQSConnectionFactory
>       SqsTransactionSetInputOperator sqsReader = dag.addOperator("TransactionsQueue",
> new SqsTransactionSetInputOperator());
>
>       // Define all the operators
>       EDIRoutingOperator ediRouter = dag.addOperator("ediRouter", new
> EDIRoutingOperator());
>       InvoiceOperator operator810 = dag.addOperator("operator810", new
> InvoiceOperator());
>       POAcknowledgmentOperator operator855 =
> dag.addOperator("operator855", new POAcknowledgmentOperator());
>       ShipNotificationOperator operator856 =
> dag.addOperator("operator856", new ShipNotificationOperator());
>       FunctionalAcknowledgmentOperator operator997 =
> dag.addOperator("operator997", new FunctionalAcknowledgmentOperator());
>
>       // Set up the EDIRoutingOperator that takes the inbound EDI data,
> and puts it into multiple
>       // outbound streams for each of the applicable EDI transactions:
>       //       810 = Invoice
>       //       855 = Purchase Order Acknowledgment
>       //       856 = Automatic Ship Notification(ASN)
>       //       997 = Functional Acknowledgment
>       // as well as one for an elasticsearch logger handler to log all
> incoming transactions.
>       dag.addStream("ediInboundStream", sqsReader.output,
> ediRouter.inboundEdiPort).setLocality(locality.CONTAINER_LOCAL);
>       dag.addStream("loopBack855", operator855.loopbackPort855,
> ediRouter.loopbackPort855).setLocality(locality.CONTAINER_LOCAL);
>       dag.addStream("loopBack856", operator856.loopbackPort856,
> ediRouter.loopbackPort856).setLocality(locality.CONTAINER_LOCAL);
>       dag.addStream("loopBack810", operator810.loopbackPort810,
> ediRouter.loopbackPort810).setLocality(locality.CONTAINER_LOCAL);
>       dag.addStream("loopBack997", operator997.loopbackPort997,
> ediRouter.loopbackPort997).setLocality(locality.CONTAINER_LOCAL);
>
>       // Set up the Invoice operator and tie it to the output stream
> created in the EDIRouter
>       dag.addStream("invoices", ediRouter.outputPort810,
> operator810.InputPort).setLocality(locality);
>
>       // Set up the Purchase Order Acknowledgment operator and tie it to
> the output stream created in the EDIRouter
>       dag.addStream("PoAcknowledgments", ediRouter.outputPort855,
> operator855.InputPort).setLocality(locality);
>
>       // Set up the Functional Acknowledgment operator and tie it to the
> output stream created in the
>       dag.addStream("shipNotifications", ediRouter.outputPort856,
> operator856.InputPort).setLocality(locality);
>
>       // Set up the Functional Acknowledgment operator and tie it to the
> output stream created in the EDIRouter
>       dag.addStream("functionalAcknowledgments", ediRouter.outputPort997,
> operator997.InputPort).setLocality(locality);
>
>       // Set up the elasticsearch operator and tie it to the output stream
> created in the EDIRouter
>       ElasticSearchMapOutputOperator operatorEs =
> dag.addOperator("operatorEs", new ElasticSearchMapOutputOperator());
>       dag.addStream("esLogger", ediRouter.outputPortEs,
> operatorEs.input).setLocality(locality);
>
>       // Set up the smtp output operator to use for the Ship Notificaiton
> messages
>       EdiSmtpOutputOperator operatorSmtp855 = dag.addOperator("operatorSmtp855",
> new EdiSmtpOutputOperator());
>       dag.addStream("smtpOutput855", operator855.outputEmails,
> operatorSmtp855.input).setLocality(locality);
>
>       // Set up the smtp output operator to use for the Ship Notificaiton
> messages
>       EdiSmtpOutputOperator operatorSmtp856 = dag.addOperator("operatorSmtp856",
> new EdiSmtpOutputOperator());
>       dag.addStream("smtpOutput856", operator856.outputEmails,
> operatorSmtp856.input).setLocality(locality);
>
>     } catch (Exception exc) {
>       DTThrowable.rethrow(exc);
>     }
>   }
> }
>
>
> ==============================================  here is one of the
> operator files, stripped of everything but the module declarations
> =============
>
> import com.datatorrent.api.DefaultOutputPort;
> import com.datatorrent.api.annotation.Stateless;
> import org.milyn.payload.StringResult;
> import org.slf4j.Logger;
> import org.slf4j.LoggerFactory;
> import org.w3c.dom.Document;
> import supplies.facility.edi.helpers.AbstractEDIProcessor;
>
> /**
>  * This Operator handles the processing for the EDI 810, Invoice
> transactions.
>  *
>  * As invoices arrive, they are added to the accounting invoicing system
> for payment,
>  * and the order status is updated to show that the item is now closed.
>  *
>  * Created by jradke on 2/7/2016.
>  */
> @Stateless
> public class InvoiceOperator extends AbstractEDIProcessor {
>
>     private static final Logger logger = LoggerFactory.getLogger(
> InvoiceOperator.class);
>
>     public InvoiceOperator() {
>         super("files/smooks-config810.xml");
>     }
>
>     public transient DefaultOutputPort<Long> loopbackPort810= new
> DefaultOutputPort<Long>();
>
>     @Override
>     protected void processXML(Document xmlDocument) {
>         logger.trace("Entered InvoiceOperator processXML");
>     }
> }
>
> Thanks!
>
> Jim
>
> -----Original Message-----
> From: Tushar Gosavi [mailto:tushar@datatorrent.com]
> Sent: Thursday, September 1, 2016 1:06 PM
> To: users@apex.apache.org
> Subject: Re: HDHT question - looking for the datatorrent gurus!
>
> Hi Jim,
>
> Yes this is what I had in mind, The manage state needs to have separate
> input for each of the 5 operators. The platform does not support connecting
> multiple output port to a single input port, but you could achieve similar
> effect using stream merge operator
> (https://github.com/apache/apex-malhar/blob/3ce83708f795b081d564be357a8333
> 928154398e/library/src/main/java/com/datatorrent/lib/
> stream/StreamMerger.java)
>
> - Tushar.
>
>
> On Thu, Sep 1, 2016 at 10:37 PM, Jim <ji...@facility.supplies> wrote:
> > Tushar,
> >
> > Funny that you described it that way, as that is exactly what I was
> thinking about this morning.
> >
> >
> > So the flow would be:
> >
> >
> >
> > Router Operator
> >
> >
> > |
> >
> >
> > Managed State Operator
> >
> >
>                                           |
> >                                 -----------------------------
> ------------------------------------------------------------
> ----------------------------------------------
> >                                |
>                    |
>       |                                                      |
> >
> >          General Acknowledgement             Detailed Acknowledgement
>                    Ship Notification
> Invoice
> >
> >                                |
>                    |
>       |                                                      |
> >                                 -----------------------------
> ------------------------------------------------------------
> ----------------------------------------------
> >
>                                          |
> >                               -----------------------------
> ------------------------------------------------------------
> -------------------------------------------------
> >                            /    each of the 4 operators at the end of
> processing emits a  record back to Managed State Operator      /
> >
> > ----------------------------------------------------------------------
> > --------------------------------------------------------------------
> >
> >
> > In this scenario, would the managed state operator just have 1 input,
> > that all the other operators emit to, or would it need to have separate
> inputs for each of the 5 operators that would be emitting to it?
> >
> > This is what you were describing too, correct?
> >
> > Thanks,
> >
> > Jim
> >
> > -----Original Message-----
> > From: Tushar Gosavi [mailto:tushar@datatorrent.com]
> > Sent: Thursday, September 1, 2016 11:49 AM
> > To: users@apex.apache.org
> > Subject: Re: HDHT question - looking for the datatorrent gurus!
> >
> > Hi Jim,
> >
> > Currently HDHT is accessible only to single operator in a DAG. Single
> HDHT store can not be managed by two different operator at a time which
> could cause metadata corruption. Theoretically HDHT bucket could be read
> from multiple operators, but only one writer is allowed.
> >
> > In your case a stage in transaction is processed completely by different
> operator and then only next stage can start. It could still be achieved by
> using a single operator which manages HDHT state, and having a loop in DAG
> to send completed transaction ids to sequencer.
> >
> > - Sequence operator will emit transaction to transaction processing
> operator.
> > - If it receives an out of order transaction it will note it down in
> HDHT.
> > - The processing operator will send completed transaction id on a port
> which is connected back to sequence operator.
> > - On receiving data on this loopback port, sequence operator will update
> HDHT and search for next transaction in order, which could be stored in
> HDHT and will emit to next processing operator.
> >
> > - Tushar.
> >
> >
> > On Sat, Aug 27, 2016 at 1:31 AM, Jim <ji...@facility.supplies> wrote:
> >> Good afternoon,
> >>
> >>
> >>
> >> I have an apex application where I may receive edi transactions, but
> >> sometimes they arrive out of order and I want to hold any out of
> >> sequence transactions till the correct time in the flow to process them.
> >>
> >>
> >>
> >> For example for a standard order, we will receive from the remote
> vendor:
> >>
> >>
> >>
> >> 1.)    General Acknowledgement
> >>
> >> 2.)    Detailed Acknowledgement
> >>
> >> 3.)    Ship Notification
> >>
> >> 4.)    Invoice
> >>
> >>
> >>
> >> They are supposed to be sent and received in that order.
> >>
> >>
> >>
> >> However sometimes vendors systems have problems, etc. so they send
> >> the all of these at the same time, and then we can receive them out of
> sequence.
> >> Data packets for these are very small, say from 1 to 512 bytes, and
> >> the only time they will be out of sequence, we will receive them very
> >> closely together.
> >>
> >>
> >>
> >> I am trying to think of the best way to do this in my datatorrent /
> >> Hadoop / yarn facilities, instead of creating a datatable in
> >> postgreSQl and using that.
> >>
> >>
> >>
> >> Can I create a flow that works like this (I am not sure if this makes
> >> sense, or is the best way to solve my problem, while keeping state,
> >> etc. maintained for all the operators):
> >>
> >>
> >>
> >> 1.)    In the inbound transaction router, check the hdht store for the
> order
> >> number, if it doesn’t exist, this means it is a new order, if the
> >> transaction trying to process is the general acknowledgment, emit the
> >> data to the general acknowledgement operator; if it is not – store
> >> the transaction data into the correct bucket identifying the
> >> transaction is it for, as well as the next step to be the general
> >> acknowledgement in HDHT by order number.
> >>
> >> 2.)    Say the next transaction is the ship notification, in the
> router, we
> >> would check the HDHT store, see this is not the next expected
> >> transaction (say it is supposed to be the detail acknowledgement), so
> >> we would just post the data for the ship notification into HDHT the
> store and say we are done.
> >>
> >> 3.)    Say we now receive the detailed acknowledgement for an order
> whose
> >> next step IS the detailed acknowledgement, we would see this is the
> >> correct next transaction, emit it to the detailed acknowledgement
> >> operator, and update the HDHT store to show that the next transaction
> >> should be the ship notification.  NOTE:  we can’t emit the ship
> >> notification yet, till we have confirmed that the detailed
> ackkowledgment has been completed.
> >>
> >> 4.)    In each of the 4 transaction operators at the end of the
> processing,
> >> we would update the HDHT store to show the next expected step, and if
> >> we already received data for the next expected step pull it from the
> >> HDHT store, and write the transaction into our SQS queue which is the
> >> input into the inbound transaction router at the beginning of the
> >> application, so it processes through the system.
> >>
> >>
> >>
> >> I believe HDHT can be used to pass data throughout an entire
> >> application, and is not limited to just a per operator basis, correct?
> >>
> >>
> >>
> >> Any comments / feedback?
> >>
> >>
> >>
> >> Thanks,
> >>
> >>
> >>
> >> Jim
>
>
>

RE: HDHT question - looking for the datatorrent gurus!

Posted by Jim <ji...@facility.supplies>.
Hi Thomas,

I saw that example, and wasn’t quite following it.

Is the DelayOperator, just a generic pass-through, that we put our output to, and then read from in the interative operator we want to go back to so that it handles the correct windowing, etc?

Jim

From: Thomas Weise [mailto:thomas.weise@gmail.com]
Sent: Saturday, September 3, 2016 11:47 AM
To: users@apex.apache.org
Subject: Re: HDHT question - looking for the datatorrent gurus!

Jim,

You need to use the delay operator for iterative processing. Here is an example:

https://github.com/apache/apex-malhar/blob/master/demos/iteration/src/main/java/com/datatorrent/demos/iteration/Application.java

Thomas

On Sat, Sep 3, 2016 at 9:05 AM, Jim <ji...@facility.supplies>> wrote:
Tushar,

I am trying to implement what you described, and I get a validation error:  Loops In Graph, and can't seem to find any reference to this.

Below I have pasted my Application.java file; what could be causing this error:

java.lang.RuntimeException: Error creating local cluster

        at com.datatorrent.stram.LocalModeImpl.getController(LocalModeImpl.java:78)
        at supplies.facility.edi.sellars.ApplicationTest.testApplication(ApplicationTest.java:30)
        at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
        at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
        at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
        at java.lang.reflect.Method.invoke(Method.java:498)
        at org.junit.runners.model.FrameworkMethod$1.runReflectiveCall(FrameworkMethod.java:44)
        at org.junit.internal.runners.model.ReflectiveCallable.run(ReflectiveCallable.java:15)
        at org.junit.runners.model.FrameworkMethod.invokeExplosively(FrameworkMethod.java:41)
        at org.junit.internal.runners.statements.InvokeMethod.evaluate(InvokeMethod.java:20)
        at org.junit.runners.BlockJUnit4ClassRunner.runNotIgnored(BlockJUnit4ClassRunner.java:79)
        at org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:71)
        at org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:49)
        at org.junit.runners.ParentRunner$3.run(ParentRunner.java:193)
        at org.junit.runners.ParentRunner$1.schedule(ParentRunner.java:52)
        at org.junit.runners.ParentRunner.runChildren(ParentRunner.java:191)
        at org.junit.runners.ParentRunner.access$000(ParentRunner.java:42)
        at org.junit.runners.ParentRunner$2.evaluate(ParentRunner.java:184)
        at org.junit.runners.ParentRunner.run(ParentRunner.java:236)
        at org.junit.runner.JUnitCore.run(JUnitCore.java:157)
        at com.intellij.junit4.JUnit4IdeaTestRunner.startRunnerWithArgs(JUnit4IdeaTestRunner.java:117)
        at com.intellij.junit4.JUnit4IdeaTestRunner.startRunnerWithArgs(JUnit4IdeaTestRunner.java:42)
        at com.intellij.rt.execution.junit.JUnitStarter.prepareStreamsAndStart(JUnitStarter.java:262)
        at com.intellij.rt.execution.junit.JUnitStarter.main(JUnitStarter.java:84)
Caused by: javax.validation.ValidationException: Loops in graph: [[OperatorMeta{name=operator997, operator=FunctionalAcknowledgmentOperator{name=null}, attributes={Attribute{defaultValue=1024, name=com.datatorrent.api.Context.OperatorContext.MEMORY_MB, codec=com.datatorrent.api.StringCodec$Integer2String@7b7fdc8}=192}<mailto:codec=com.datatorrent.api.StringCodec$Integer2String@7b7fdc8%7d=192%7d>}, OperatorMeta{name=operator856, operator=ShipNotificationOperator{name=null}, attributes={Attribute{defaultValue=1024, name=com.datatorrent.api.Context.OperatorContext.MEMORY_MB, codec=com.datatorrent.api.StringCodec$Integer2String@7b7fdc8}=192}<mailto:codec=com.datatorrent.api.StringCodec$Integer2String@7b7fdc8%7d=192%7d>}, OperatorMeta{name=operator855, operator=POAcknowledgmentOperator{name=null}, attributes={Attribute{defaultValue=1024, name=com.datatorrent.api.Context.OperatorContext.MEMORY_MB, codec=com.datatorrent.api.StringCodec$Integer2String@7b7fdc8}=192}<mailto:codec=com.datatorrent.api.StringCodec$Integer2String@7b7fdc8%7d=192%7d>}, OperatorMeta{name=ediRouter, operator=EDIRoutingOperator{name=null}, attributes={Attribute{defaultValue=1024, name=com.datatorrent.api.Context.OperatorContext.MEMORY_MB, codec=com.datatorrent.api.StringCodec$Integer2String@7b7fdc8}=512}<mailto:codec=com.datatorrent.api.StringCodec$Integer2String@7b7fdc8%7d=512%7d>}, OperatorMeta{name=operator810, operator=InvoiceOperator{name=null}, attributes={Attribute{defaultValue=1024, name=com.datatorrent.api.Context.OperatorContext.MEMORY_MB, codec=com.datatorrent.api.StringCodec$Integer2String@7b7fdc8}=192}<mailto:codec=com.datatorrent.api.StringCodec$Integer2String@7b7fdc8%7d=192%7d>}]]
        at com.datatorrent.stram.plan.logical.LogicalPlan.validate(LogicalPlan.java:1775)
        at com.datatorrent.stram.StramLocalCluster.<init>(StramLocalCluster.java:278)
        at com.datatorrent.stram.LocalModeImpl.getController(LocalModeImpl.java:76)
        ... 23 more

============================================  Application.java starts here ============================================================

/**
 * Put your copyright and license info here.
 */
package supplies.facility.edi.sellars;

import com.datatorrent.api.DAG;
import com.datatorrent.api.DAG.Locality;
import com.datatorrent.api.StreamingApplication;
import com.datatorrent.api.annotation.ApplicationAnnotation;
import com.datatorrent.contrib.kinesis.AbstractKinesisInputOperator;
import com.datatorrent.contrib.kinesis.KinesisConsumer;
import com.datatorrent.contrib.kinesis.KinesisStringInputOperator;
import com.datatorrent.contrib.kinesis.ShardManager;
import com.datatorrent.lib.db.jdbc.JdbcStore;
import com.datatorrent.netlet.util.DTThrowable;
import org.apache.hadoop.conf.Configuration;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import supplies.facility.apex.contrib.elasticsearch.ElasticSearchMapOutputOperator;
import supplies.facility.edi.helpers.EdiSmtpOutputOperator;
import supplies.facility.edi.helpers.KinesisEdiTransactionSetInputOperator;
import supplies.facility.edi.helpers.SqsTransactionSetInputOperator;
import supplies.facility.edi.helpers.StatefulShardManager;

import java.sql.PreparedStatement;
import java.sql.ResultSet;

@ApplicationAnnotation(name="FsEdiSellars")
public class Application implements StreamingApplication {
  private final Locality locality = null;

  private static final Logger LOG = LoggerFactory.getLogger(Application.class);

  private JdbcStore jdbcStore;

  @Override
  public void populateDAG(DAG dag, Configuration conf) {

    try {
      //Read from SQS FS_EDI_SELLARS
      //SQSConnectionFactory
      SqsTransactionSetInputOperator sqsReader = dag.addOperator("TransactionsQueue", new SqsTransactionSetInputOperator());

      // Define all the operators
      EDIRoutingOperator ediRouter = dag.addOperator("ediRouter", new EDIRoutingOperator());
      InvoiceOperator operator810 = dag.addOperator("operator810", new InvoiceOperator());
      POAcknowledgmentOperator operator855 = dag.addOperator("operator855", new POAcknowledgmentOperator());
      ShipNotificationOperator operator856 = dag.addOperator("operator856", new ShipNotificationOperator());
      FunctionalAcknowledgmentOperator operator997 = dag.addOperator("operator997", new FunctionalAcknowledgmentOperator());

      // Set up the EDIRoutingOperator that takes the inbound EDI data, and puts it into multiple
      // outbound streams for each of the applicable EDI transactions:
      //       810 = Invoice
      //       855 = Purchase Order Acknowledgment
      //       856 = Automatic Ship Notification(ASN)
      //       997 = Functional Acknowledgment
      // as well as one for an elasticsearch logger handler to log all incoming transactions.
      dag.addStream("ediInboundStream", sqsReader.output, ediRouter.inboundEdiPort).setLocality(locality.CONTAINER_LOCAL);
      dag.addStream("loopBack855", operator855.loopbackPort855, ediRouter.loopbackPort855).setLocality(locality.CONTAINER_LOCAL);
      dag.addStream("loopBack856", operator856.loopbackPort856, ediRouter.loopbackPort856).setLocality(locality.CONTAINER_LOCAL);
      dag.addStream("loopBack810", operator810.loopbackPort810, ediRouter.loopbackPort810).setLocality(locality.CONTAINER_LOCAL);
      dag.addStream("loopBack997", operator997.loopbackPort997, ediRouter.loopbackPort997).setLocality(locality.CONTAINER_LOCAL);

      // Set up the Invoice operator and tie it to the output stream created in the EDIRouter
      dag.addStream("invoices", ediRouter.outputPort810, operator810.InputPort).setLocality(locality);

      // Set up the Purchase Order Acknowledgment operator and tie it to the output stream created in the EDIRouter
      dag.addStream("PoAcknowledgments", ediRouter.outputPort855, operator855.InputPort).setLocality(locality);

      // Set up the Functional Acknowledgment operator and tie it to the output stream created in the
      dag.addStream("shipNotifications", ediRouter.outputPort856, operator856.InputPort).setLocality(locality);

      // Set up the Functional Acknowledgment operator and tie it to the output stream created in the EDIRouter
      dag.addStream("functionalAcknowledgments", ediRouter.outputPort997, operator997.InputPort).setLocality(locality);

      // Set up the elasticsearch operator and tie it to the output stream created in the EDIRouter
      ElasticSearchMapOutputOperator operatorEs = dag.addOperator("operatorEs", new ElasticSearchMapOutputOperator());
      dag.addStream("esLogger", ediRouter.outputPortEs, operatorEs.input).setLocality(locality);

      // Set up the smtp output operator to use for the Ship Notificaiton messages
      EdiSmtpOutputOperator operatorSmtp855 = dag.addOperator("operatorSmtp855", new EdiSmtpOutputOperator());
      dag.addStream("smtpOutput855", operator855.outputEmails, operatorSmtp855.input).setLocality(locality);

      // Set up the smtp output operator to use for the Ship Notificaiton messages
      EdiSmtpOutputOperator operatorSmtp856 = dag.addOperator("operatorSmtp856", new EdiSmtpOutputOperator());
      dag.addStream("smtpOutput856", operator856.outputEmails, operatorSmtp856.input).setLocality(locality);

    } catch (Exception exc) {
      DTThrowable.rethrow(exc);
    }
  }
}


==============================================  here is one of the operator files, stripped of everything but the module declarations =============

import com.datatorrent.api.DefaultOutputPort;
import com.datatorrent.api.annotation.Stateless;
import org.milyn.payload.StringResult;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.w3c.dom.Document;
import supplies.facility.edi.helpers.AbstractEDIProcessor;

/**
 * This Operator handles the processing for the EDI 810, Invoice transactions.
 *
 * As invoices arrive, they are added to the accounting invoicing system for payment,
 * and the order status is updated to show that the item is now closed.
 *
 * Created by jradke on 2/7/2016.
 */
@Stateless
public class InvoiceOperator extends AbstractEDIProcessor {

    private static final Logger logger = LoggerFactory.getLogger(InvoiceOperator.class);

    public InvoiceOperator() {
        super("files/smooks-config810.xml");
    }

    public transient DefaultOutputPort<Long> loopbackPort810= new DefaultOutputPort<Long>();

    @Override
    protected void processXML(Document xmlDocument) {
        logger.trace("Entered InvoiceOperator processXML");
    }
}

Thanks!

Jim

-----Original Message-----
From: Tushar Gosavi [mailto:tushar@datatorrent.com<ma...@datatorrent.com>]
Sent: Thursday, September 1, 2016 1:06 PM
To: users@apex.apache.org<ma...@apex.apache.org>
Subject: Re: HDHT question - looking for the datatorrent gurus!

Hi Jim,
Yes this is what I had in mind, The manage state needs to have separate input for each of the 5 operators. The platform does not support connecting multiple output port to a single input port, but you could achieve similar effect using stream merge operator
(https://github.com/apache/apex-malhar/blob/3ce83708f795b081d564be357a8333928154398e/library/src/main/java/com/datatorrent/lib/stream/StreamMerger.java)

- Tushar.


On Thu, Sep 1, 2016 at 10:37 PM, Jim <ji...@facility.supplies>> wrote:
> Tushar,
>
> Funny that you described it that way, as that is exactly what I was thinking about this morning.
>
>
> So the flow would be:
>
>
>
> Router Operator
>
>
> |
>
>
> Managed State Operator
>
>                                                                                                                    |
>                                 ---------------------------------------------------------------------------------------------------------------------------------------
>                                |                                                           |                                                              |                                                      |
>
>          General Acknowledgement             Detailed Acknowledgement                       Ship Notification                                  Invoice
>
>                                |                                                           |                                                              |                                                      |
>                                 ---------------------------------------------------------------------------------------------------------------------------------------
>                                                                                                                   |
>                               ------------------------------------------------------------------------------------------------------------------------------------------
>                            /    each of the 4 operators at the end of processing emits a  record back to Managed State Operator      /
>
> ----------------------------------------------------------------------
> --------------------------------------------------------------------
>
>
> In this scenario, would the managed state operator just have 1 input,
> that all the other operators emit to, or would it need to have separate inputs for each of the 5 operators that would be emitting to it?
>
> This is what you were describing too, correct?
>
> Thanks,
>
> Jim
>
> -----Original Message-----
> From: Tushar Gosavi [mailto:tushar@datatorrent.com<ma...@datatorrent.com>]
> Sent: Thursday, September 1, 2016 11:49 AM
> To: users@apex.apache.org<ma...@apex.apache.org>
> Subject: Re: HDHT question - looking for the datatorrent gurus!
>
> Hi Jim,
>
> Currently HDHT is accessible only to single operator in a DAG. Single HDHT store can not be managed by two different operator at a time which could cause metadata corruption. Theoretically HDHT bucket could be read from multiple operators, but only one writer is allowed.
>
> In your case a stage in transaction is processed completely by different operator and then only next stage can start. It could still be achieved by using a single operator which manages HDHT state, and having a loop in DAG to send completed transaction ids to sequencer.
>
> - Sequence operator will emit transaction to transaction processing operator.
> - If it receives an out of order transaction it will note it down in HDHT.
> - The processing operator will send completed transaction id on a port which is connected back to sequence operator.
> - On receiving data on this loopback port, sequence operator will update HDHT and search for next transaction in order, which could be stored in HDHT and will emit to next processing operator.
>
> - Tushar.
>
>
> On Sat, Aug 27, 2016 at 1:31 AM, Jim <ji...@facility.supplies>> wrote:
>> Good afternoon,
>>
>>
>>
>> I have an apex application where I may receive edi transactions, but
>> sometimes they arrive out of order and I want to hold any out of
>> sequence transactions till the correct time in the flow to process them.
>>
>>
>>
>> For example for a standard order, we will receive from the remote vendor:
>>
>>
>>
>> 1.)    General Acknowledgement
>>
>> 2.)    Detailed Acknowledgement
>>
>> 3.)    Ship Notification
>>
>> 4.)    Invoice
>>
>>
>>
>> They are supposed to be sent and received in that order.
>>
>>
>>
>> However sometimes vendors systems have problems, etc. so they send
>> the all of these at the same time, and then we can receive them out of sequence.
>> Data packets for these are very small, say from 1 to 512 bytes, and
>> the only time they will be out of sequence, we will receive them very
>> closely together.
>>
>>
>>
>> I am trying to think of the best way to do this in my datatorrent /
>> Hadoop / yarn facilities, instead of creating a datatable in
>> postgreSQl and using that.
>>
>>
>>
>> Can I create a flow that works like this (I am not sure if this makes
>> sense, or is the best way to solve my problem, while keeping state,
>> etc. maintained for all the operators):
>>
>>
>>
>> 1.)    In the inbound transaction router, check the hdht store for the order
>> number, if it doesn’t exist, this means it is a new order, if the
>> transaction trying to process is the general acknowledgment, emit the
>> data to the general acknowledgement operator; if it is not – store
>> the transaction data into the correct bucket identifying the
>> transaction is it for, as well as the next step to be the general
>> acknowledgement in HDHT by order number.
>>
>> 2.)    Say the next transaction is the ship notification, in the router, we
>> would check the HDHT store, see this is not the next expected
>> transaction (say it is supposed to be the detail acknowledgement), so
>> we would just post the data for the ship notification into HDHT the store and say we are done.
>>
>> 3.)    Say we now receive the detailed acknowledgement for an order whose
>> next step IS the detailed acknowledgement, we would see this is the
>> correct next transaction, emit it to the detailed acknowledgement
>> operator, and update the HDHT store to show that the next transaction
>> should be the ship notification.  NOTE:  we can’t emit the ship
>> notification yet, till we have confirmed that the detailed ackkowledgment has been completed.
>>
>> 4.)    In each of the 4 transaction operators at the end of the processing,
>> we would update the HDHT store to show the next expected step, and if
>> we already received data for the next expected step pull it from the
>> HDHT store, and write the transaction into our SQS queue which is the
>> input into the inbound transaction router at the beginning of the
>> application, so it processes through the system.
>>
>>
>>
>> I believe HDHT can be used to pass data throughout an entire
>> application, and is not limited to just a per operator basis, correct?
>>
>>
>>
>> Any comments / feedback?
>>
>>
>>
>> Thanks,
>>
>>
>>
>> Jim


RE: HDHT question - looking for the datatorrent gurus!

Posted by Jim <ji...@facility.supplies>.
That all works!

Thanks Thomas,

Jim

From: Thomas Weise [mailto:thomas.weise@gmail.com]
Sent: Saturday, September 3, 2016 11:47 AM
To: users@apex.apache.org
Subject: Re: HDHT question - looking for the datatorrent gurus!

Jim,

You need to use the delay operator for iterative processing. Here is an example:

https://github.com/apache/apex-malhar/blob/master/demos/iteration/src/main/java/com/datatorrent/demos/iteration/Application.java

Thomas

On Sat, Sep 3, 2016 at 9:05 AM, Jim <ji...@facility.supplies>> wrote:
Tushar,

I am trying to implement what you described, and I get a validation error:  Loops In Graph, and can't seem to find any reference to this.

Below I have pasted my Application.java file; what could be causing this error:

java.lang.RuntimeException: Error creating local cluster

        at com.datatorrent.stram.LocalModeImpl.getController(LocalModeImpl.java:78)
        at supplies.facility.edi.sellars.ApplicationTest.testApplication(ApplicationTest.java:30)
        at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
        at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
        at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
        at java.lang.reflect.Method.invoke(Method.java:498)
        at org.junit.runners.model.FrameworkMethod$1.runReflectiveCall(FrameworkMethod.java:44)
        at org.junit.internal.runners.model.ReflectiveCallable.run(ReflectiveCallable.java:15)
        at org.junit.runners.model.FrameworkMethod.invokeExplosively(FrameworkMethod.java:41)
        at org.junit.internal.runners.statements.InvokeMethod.evaluate(InvokeMethod.java:20)
        at org.junit.runners.BlockJUnit4ClassRunner.runNotIgnored(BlockJUnit4ClassRunner.java:79)
        at org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:71)
        at org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:49)
        at org.junit.runners.ParentRunner$3.run(ParentRunner.java:193)
        at org.junit.runners.ParentRunner$1.schedule(ParentRunner.java:52)
        at org.junit.runners.ParentRunner.runChildren(ParentRunner.java:191)
        at org.junit.runners.ParentRunner.access$000(ParentRunner.java:42)
        at org.junit.runners.ParentRunner$2.evaluate(ParentRunner.java:184)
        at org.junit.runners.ParentRunner.run(ParentRunner.java:236)
        at org.junit.runner.JUnitCore.run(JUnitCore.java:157)
        at com.intellij.junit4.JUnit4IdeaTestRunner.startRunnerWithArgs(JUnit4IdeaTestRunner.java:117)
        at com.intellij.junit4.JUnit4IdeaTestRunner.startRunnerWithArgs(JUnit4IdeaTestRunner.java:42)
        at com.intellij.rt.execution.junit.JUnitStarter.prepareStreamsAndStart(JUnitStarter.java:262)
        at com.intellij.rt.execution.junit.JUnitStarter.main(JUnitStarter.java:84)
Caused by: javax.validation.ValidationException: Loops in graph: [[OperatorMeta{name=operator997, operator=FunctionalAcknowledgmentOperator{name=null}, attributes={Attribute{defaultValue=1024, name=com.datatorrent.api.Context.OperatorContext.MEMORY_MB, codec=com.datatorrent.api.StringCodec$Integer2String@7b7fdc8}=192}<mailto:codec=com.datatorrent.api.StringCodec$Integer2String@7b7fdc8%7d=192%7d>}, OperatorMeta{name=operator856, operator=ShipNotificationOperator{name=null}, attributes={Attribute{defaultValue=1024, name=com.datatorrent.api.Context.OperatorContext.MEMORY_MB, codec=com.datatorrent.api.StringCodec$Integer2String@7b7fdc8}=192}<mailto:codec=com.datatorrent.api.StringCodec$Integer2String@7b7fdc8%7d=192%7d>}, OperatorMeta{name=operator855, operator=POAcknowledgmentOperator{name=null}, attributes={Attribute{defaultValue=1024, name=com.datatorrent.api.Context.OperatorContext.MEMORY_MB, codec=com.datatorrent.api.StringCodec$Integer2String@7b7fdc8}=192}<mailto:codec=com.datatorrent.api.StringCodec$Integer2String@7b7fdc8%7d=192%7d>}, OperatorMeta{name=ediRouter, operator=EDIRoutingOperator{name=null}, attributes={Attribute{defaultValue=1024, name=com.datatorrent.api.Context.OperatorContext.MEMORY_MB, codec=com.datatorrent.api.StringCodec$Integer2String@7b7fdc8}=512}<mailto:codec=com.datatorrent.api.StringCodec$Integer2String@7b7fdc8%7d=512%7d>}, OperatorMeta{name=operator810, operator=InvoiceOperator{name=null}, attributes={Attribute{defaultValue=1024, name=com.datatorrent.api.Context.OperatorContext.MEMORY_MB, codec=com.datatorrent.api.StringCodec$Integer2String@7b7fdc8}=192}<mailto:codec=com.datatorrent.api.StringCodec$Integer2String@7b7fdc8%7d=192%7d>}]]
        at com.datatorrent.stram.plan.logical.LogicalPlan.validate(LogicalPlan.java:1775)
        at com.datatorrent.stram.StramLocalCluster.<init>(StramLocalCluster.java:278)
        at com.datatorrent.stram.LocalModeImpl.getController(LocalModeImpl.java:76)
        ... 23 more

============================================  Application.java starts here ============================================================

/**
 * Put your copyright and license info here.
 */
package supplies.facility.edi.sellars;

import com.datatorrent.api.DAG;
import com.datatorrent.api.DAG.Locality;
import com.datatorrent.api.StreamingApplication;
import com.datatorrent.api.annotation.ApplicationAnnotation;
import com.datatorrent.contrib.kinesis.AbstractKinesisInputOperator;
import com.datatorrent.contrib.kinesis.KinesisConsumer;
import com.datatorrent.contrib.kinesis.KinesisStringInputOperator;
import com.datatorrent.contrib.kinesis.ShardManager;
import com.datatorrent.lib.db.jdbc.JdbcStore;
import com.datatorrent.netlet.util.DTThrowable;
import org.apache.hadoop.conf.Configuration;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import supplies.facility.apex.contrib.elasticsearch.ElasticSearchMapOutputOperator;
import supplies.facility.edi.helpers.EdiSmtpOutputOperator;
import supplies.facility.edi.helpers.KinesisEdiTransactionSetInputOperator;
import supplies.facility.edi.helpers.SqsTransactionSetInputOperator;
import supplies.facility.edi.helpers.StatefulShardManager;

import java.sql.PreparedStatement;
import java.sql.ResultSet;

@ApplicationAnnotation(name="FsEdiSellars")
public class Application implements StreamingApplication {
  private final Locality locality = null;

  private static final Logger LOG = LoggerFactory.getLogger(Application.class);

  private JdbcStore jdbcStore;

  @Override
  public void populateDAG(DAG dag, Configuration conf) {

    try {
      //Read from SQS FS_EDI_SELLARS
      //SQSConnectionFactory
      SqsTransactionSetInputOperator sqsReader = dag.addOperator("TransactionsQueue", new SqsTransactionSetInputOperator());

      // Define all the operators
      EDIRoutingOperator ediRouter = dag.addOperator("ediRouter", new EDIRoutingOperator());
      InvoiceOperator operator810 = dag.addOperator("operator810", new InvoiceOperator());
      POAcknowledgmentOperator operator855 = dag.addOperator("operator855", new POAcknowledgmentOperator());
      ShipNotificationOperator operator856 = dag.addOperator("operator856", new ShipNotificationOperator());
      FunctionalAcknowledgmentOperator operator997 = dag.addOperator("operator997", new FunctionalAcknowledgmentOperator());

      // Set up the EDIRoutingOperator that takes the inbound EDI data, and puts it into multiple
      // outbound streams for each of the applicable EDI transactions:
      //       810 = Invoice
      //       855 = Purchase Order Acknowledgment
      //       856 = Automatic Ship Notification(ASN)
      //       997 = Functional Acknowledgment
      // as well as one for an elasticsearch logger handler to log all incoming transactions.
      dag.addStream("ediInboundStream", sqsReader.output, ediRouter.inboundEdiPort).setLocality(locality.CONTAINER_LOCAL);
      dag.addStream("loopBack855", operator855.loopbackPort855, ediRouter.loopbackPort855).setLocality(locality.CONTAINER_LOCAL);
      dag.addStream("loopBack856", operator856.loopbackPort856, ediRouter.loopbackPort856).setLocality(locality.CONTAINER_LOCAL);
      dag.addStream("loopBack810", operator810.loopbackPort810, ediRouter.loopbackPort810).setLocality(locality.CONTAINER_LOCAL);
      dag.addStream("loopBack997", operator997.loopbackPort997, ediRouter.loopbackPort997).setLocality(locality.CONTAINER_LOCAL);

      // Set up the Invoice operator and tie it to the output stream created in the EDIRouter
      dag.addStream("invoices", ediRouter.outputPort810, operator810.InputPort).setLocality(locality);

      // Set up the Purchase Order Acknowledgment operator and tie it to the output stream created in the EDIRouter
      dag.addStream("PoAcknowledgments", ediRouter.outputPort855, operator855.InputPort).setLocality(locality);

      // Set up the Functional Acknowledgment operator and tie it to the output stream created in the
      dag.addStream("shipNotifications", ediRouter.outputPort856, operator856.InputPort).setLocality(locality);

      // Set up the Functional Acknowledgment operator and tie it to the output stream created in the EDIRouter
      dag.addStream("functionalAcknowledgments", ediRouter.outputPort997, operator997.InputPort).setLocality(locality);

      // Set up the elasticsearch operator and tie it to the output stream created in the EDIRouter
      ElasticSearchMapOutputOperator operatorEs = dag.addOperator("operatorEs", new ElasticSearchMapOutputOperator());
      dag.addStream("esLogger", ediRouter.outputPortEs, operatorEs.input).setLocality(locality);

      // Set up the smtp output operator to use for the Ship Notificaiton messages
      EdiSmtpOutputOperator operatorSmtp855 = dag.addOperator("operatorSmtp855", new EdiSmtpOutputOperator());
      dag.addStream("smtpOutput855", operator855.outputEmails, operatorSmtp855.input).setLocality(locality);

      // Set up the smtp output operator to use for the Ship Notificaiton messages
      EdiSmtpOutputOperator operatorSmtp856 = dag.addOperator("operatorSmtp856", new EdiSmtpOutputOperator());
      dag.addStream("smtpOutput856", operator856.outputEmails, operatorSmtp856.input).setLocality(locality);

    } catch (Exception exc) {
      DTThrowable.rethrow(exc);
    }
  }
}


==============================================  here is one of the operator files, stripped of everything but the module declarations =============

import com.datatorrent.api.DefaultOutputPort;
import com.datatorrent.api.annotation.Stateless;
import org.milyn.payload.StringResult;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.w3c.dom.Document;
import supplies.facility.edi.helpers.AbstractEDIProcessor;

/**
 * This Operator handles the processing for the EDI 810, Invoice transactions.
 *
 * As invoices arrive, they are added to the accounting invoicing system for payment,
 * and the order status is updated to show that the item is now closed.
 *
 * Created by jradke on 2/7/2016.
 */
@Stateless
public class InvoiceOperator extends AbstractEDIProcessor {

    private static final Logger logger = LoggerFactory.getLogger(InvoiceOperator.class);

    public InvoiceOperator() {
        super("files/smooks-config810.xml");
    }

    public transient DefaultOutputPort<Long> loopbackPort810= new DefaultOutputPort<Long>();

    @Override
    protected void processXML(Document xmlDocument) {
        logger.trace("Entered InvoiceOperator processXML");
    }
}

Thanks!

Jim

-----Original Message-----
From: Tushar Gosavi [mailto:tushar@datatorrent.com<ma...@datatorrent.com>]
Sent: Thursday, September 1, 2016 1:06 PM
To: users@apex.apache.org<ma...@apex.apache.org>
Subject: Re: HDHT question - looking for the datatorrent gurus!

Hi Jim,
Yes this is what I had in mind, The manage state needs to have separate input for each of the 5 operators. The platform does not support connecting multiple output port to a single input port, but you could achieve similar effect using stream merge operator
(https://github.com/apache/apex-malhar/blob/3ce83708f795b081d564be357a8333928154398e/library/src/main/java/com/datatorrent/lib/stream/StreamMerger.java)

- Tushar.


On Thu, Sep 1, 2016 at 10:37 PM, Jim <ji...@facility.supplies>> wrote:
> Tushar,
>
> Funny that you described it that way, as that is exactly what I was thinking about this morning.
>
>
> So the flow would be:
>
>
>
> Router Operator
>
>
> |
>
>
> Managed State Operator
>
>                                                                                                                    |
>                                 ---------------------------------------------------------------------------------------------------------------------------------------
>                                |                                                           |                                                              |                                                      |
>
>          General Acknowledgement             Detailed Acknowledgement                       Ship Notification                                  Invoice
>
>                                |                                                           |                                                              |                                                      |
>                                 ---------------------------------------------------------------------------------------------------------------------------------------
>                                                                                                                   |
>                               ------------------------------------------------------------------------------------------------------------------------------------------
>                            /    each of the 4 operators at the end of processing emits a  record back to Managed State Operator      /
>
> ----------------------------------------------------------------------
> --------------------------------------------------------------------
>
>
> In this scenario, would the managed state operator just have 1 input,
> that all the other operators emit to, or would it need to have separate inputs for each of the 5 operators that would be emitting to it?
>
> This is what you were describing too, correct?
>
> Thanks,
>
> Jim
>
> -----Original Message-----
> From: Tushar Gosavi [mailto:tushar@datatorrent.com<ma...@datatorrent.com>]
> Sent: Thursday, September 1, 2016 11:49 AM
> To: users@apex.apache.org<ma...@apex.apache.org>
> Subject: Re: HDHT question - looking for the datatorrent gurus!
>
> Hi Jim,
>
> Currently HDHT is accessible only to single operator in a DAG. Single HDHT store can not be managed by two different operator at a time which could cause metadata corruption. Theoretically HDHT bucket could be read from multiple operators, but only one writer is allowed.
>
> In your case a stage in transaction is processed completely by different operator and then only next stage can start. It could still be achieved by using a single operator which manages HDHT state, and having a loop in DAG to send completed transaction ids to sequencer.
>
> - Sequence operator will emit transaction to transaction processing operator.
> - If it receives an out of order transaction it will note it down in HDHT.
> - The processing operator will send completed transaction id on a port which is connected back to sequence operator.
> - On receiving data on this loopback port, sequence operator will update HDHT and search for next transaction in order, which could be stored in HDHT and will emit to next processing operator.
>
> - Tushar.
>
>
> On Sat, Aug 27, 2016 at 1:31 AM, Jim <ji...@facility.supplies>> wrote:
>> Good afternoon,
>>
>>
>>
>> I have an apex application where I may receive edi transactions, but
>> sometimes they arrive out of order and I want to hold any out of
>> sequence transactions till the correct time in the flow to process them.
>>
>>
>>
>> For example for a standard order, we will receive from the remote vendor:
>>
>>
>>
>> 1.)    General Acknowledgement
>>
>> 2.)    Detailed Acknowledgement
>>
>> 3.)    Ship Notification
>>
>> 4.)    Invoice
>>
>>
>>
>> They are supposed to be sent and received in that order.
>>
>>
>>
>> However sometimes vendors systems have problems, etc. so they send
>> the all of these at the same time, and then we can receive them out of sequence.
>> Data packets for these are very small, say from 1 to 512 bytes, and
>> the only time they will be out of sequence, we will receive them very
>> closely together.
>>
>>
>>
>> I am trying to think of the best way to do this in my datatorrent /
>> Hadoop / yarn facilities, instead of creating a datatable in
>> postgreSQl and using that.
>>
>>
>>
>> Can I create a flow that works like this (I am not sure if this makes
>> sense, or is the best way to solve my problem, while keeping state,
>> etc. maintained for all the operators):
>>
>>
>>
>> 1.)    In the inbound transaction router, check the hdht store for the order
>> number, if it doesn’t exist, this means it is a new order, if the
>> transaction trying to process is the general acknowledgment, emit the
>> data to the general acknowledgement operator; if it is not – store
>> the transaction data into the correct bucket identifying the
>> transaction is it for, as well as the next step to be the general
>> acknowledgement in HDHT by order number.
>>
>> 2.)    Say the next transaction is the ship notification, in the router, we
>> would check the HDHT store, see this is not the next expected
>> transaction (say it is supposed to be the detail acknowledgement), so
>> we would just post the data for the ship notification into HDHT the store and say we are done.
>>
>> 3.)    Say we now receive the detailed acknowledgement for an order whose
>> next step IS the detailed acknowledgement, we would see this is the
>> correct next transaction, emit it to the detailed acknowledgement
>> operator, and update the HDHT store to show that the next transaction
>> should be the ship notification.  NOTE:  we can’t emit the ship
>> notification yet, till we have confirmed that the detailed ackkowledgment has been completed.
>>
>> 4.)    In each of the 4 transaction operators at the end of the processing,
>> we would update the HDHT store to show the next expected step, and if
>> we already received data for the next expected step pull it from the
>> HDHT store, and write the transaction into our SQS queue which is the
>> input into the inbound transaction router at the beginning of the
>> application, so it processes through the system.
>>
>>
>>
>> I believe HDHT can be used to pass data throughout an entire
>> application, and is not limited to just a per operator basis, correct?
>>
>>
>>
>> Any comments / feedback?
>>
>>
>>
>> Thanks,
>>
>>
>>
>> Jim


Re: HDHT question - looking for the datatorrent gurus!

Posted by Thomas Weise <th...@gmail.com>.
Jim,

You need to use the delay operator for iterative processing. Here is an
example:

https://github.com/apache/apex-malhar/blob/master/demos/iteration/src/main/java/com/datatorrent/demos/iteration/Application.java

Thomas

On Sat, Sep 3, 2016 at 9:05 AM, Jim <ji...@facility.supplies> wrote:

> Tushar,
>
> I am trying to implement what you described, and I get a validation
> error:  Loops In Graph, and can't seem to find any reference to this.
>
> Below I have pasted my Application.java file; what could be causing this
> error:
>
> java.lang.RuntimeException: Error creating local cluster
>
>         at com.datatorrent.stram.LocalModeImpl.getController(
> LocalModeImpl.java:78)
>         at supplies.facility.edi.sellars.ApplicationTest.testApplication(
> ApplicationTest.java:30)
>         at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
>         at sun.reflect.NativeMethodAccessorImpl.invoke(
> NativeMethodAccessorImpl.java:62)
>         at sun.reflect.DelegatingMethodAccessorImpl.invoke(
> DelegatingMethodAccessorImpl.java:43)
>         at java.lang.reflect.Method.invoke(Method.java:498)
>         at org.junit.runners.model.FrameworkMethod$1.runReflectiveCall(
> FrameworkMethod.java:44)
>         at org.junit.internal.runners.model.ReflectiveCallable.run(
> ReflectiveCallable.java:15)
>         at org.junit.runners.model.FrameworkMethod.invokeExplosively(
> FrameworkMethod.java:41)
>         at org.junit.internal.runners.statements.InvokeMethod.
> evaluate(InvokeMethod.java:20)
>         at org.junit.runners.BlockJUnit4ClassRunner.runNotIgnored(
> BlockJUnit4ClassRunner.java:79)
>         at org.junit.runners.BlockJUnit4ClassRunner.runChild(
> BlockJUnit4ClassRunner.java:71)
>         at org.junit.runners.BlockJUnit4ClassRunner.runChild(
> BlockJUnit4ClassRunner.java:49)
>         at org.junit.runners.ParentRunner$3.run(ParentRunner.java:193)
>         at org.junit.runners.ParentRunner$1.schedule(ParentRunner.java:52)
>         at org.junit.runners.ParentRunner.runChildren(
> ParentRunner.java:191)
>         at org.junit.runners.ParentRunner.access$000(ParentRunner.java:42)
>         at org.junit.runners.ParentRunner$2.evaluate(
> ParentRunner.java:184)
>         at org.junit.runners.ParentRunner.run(ParentRunner.java:236)
>         at org.junit.runner.JUnitCore.run(JUnitCore.java:157)
>         at com.intellij.junit4.JUnit4IdeaTestRunner.startRunnerWithArgs(
> JUnit4IdeaTestRunner.java:117)
>         at com.intellij.junit4.JUnit4IdeaTestRunner.startRunnerWithArgs(
> JUnit4IdeaTestRunner.java:42)
>         at com.intellij.rt.execution.junit.JUnitStarter.
> prepareStreamsAndStart(JUnitStarter.java:262)
>         at com.intellij.rt.execution.junit.JUnitStarter.main(
> JUnitStarter.java:84)
> Caused by: javax.validation.ValidationException: Loops in graph:
> [[OperatorMeta{name=operator997, operator=FunctionalAcknowledgmentOperator{name=null},
> attributes={Attribute{defaultValue=1024, name=com.datatorrent.api.
> Context.OperatorContext.MEMORY_MB, codec=com.datatorrent.api.
> StringCodec$Integer2String@7b7fdc8}=192}}, OperatorMeta{name=operator856,
> operator=ShipNotificationOperator{name=null}, attributes={Attribute{defaultValue=1024,
> name=com.datatorrent.api.Context.OperatorContext.MEMORY_MB,
> codec=com.datatorrent.api.StringCodec$Integer2String@7b7fdc8}=192}},
> OperatorMeta{name=operator855, operator=POAcknowledgmentOperator{name=null},
> attributes={Attribute{defaultValue=1024, name=com.datatorrent.api.
> Context.OperatorContext.MEMORY_MB, codec=com.datatorrent.api.
> StringCodec$Integer2String@7b7fdc8}=192}}, OperatorMeta{name=ediRouter,
> operator=EDIRoutingOperator{name=null}, attributes={Attribute{defaultValue=1024,
> name=com.datatorrent.api.Context.OperatorContext.MEMORY_MB,
> codec=com.datatorrent.api.StringCodec$Integer2String@7b7fdc8}=512}},
> OperatorMeta{name=operator810, operator=InvoiceOperator{name=null},
> attributes={Attribute{defaultValue=1024, name=com.datatorrent.api.
> Context.OperatorContext.MEMORY_MB, codec=com.datatorrent.api.
> StringCodec$Integer2String@7b7fdc8}=192}}]]
>         at com.datatorrent.stram.plan.logical.LogicalPlan.validate(
> LogicalPlan.java:1775)
>         at com.datatorrent.stram.StramLocalCluster.<init>(
> StramLocalCluster.java:278)
>         at com.datatorrent.stram.LocalModeImpl.getController(
> LocalModeImpl.java:76)
>         ... 23 more
>
> ============================================  Application.java starts
> here ============================================================
>
> /**
>  * Put your copyright and license info here.
>  */
> package supplies.facility.edi.sellars;
>
> import com.datatorrent.api.DAG;
> import com.datatorrent.api.DAG.Locality;
> import com.datatorrent.api.StreamingApplication;
> import com.datatorrent.api.annotation.ApplicationAnnotation;
> import com.datatorrent.contrib.kinesis.AbstractKinesisInputOperator;
> import com.datatorrent.contrib.kinesis.KinesisConsumer;
> import com.datatorrent.contrib.kinesis.KinesisStringInputOperator;
> import com.datatorrent.contrib.kinesis.ShardManager;
> import com.datatorrent.lib.db.jdbc.JdbcStore;
> import com.datatorrent.netlet.util.DTThrowable;
> import org.apache.hadoop.conf.Configuration;
> import org.slf4j.Logger;
> import org.slf4j.LoggerFactory;
> import supplies.facility.apex.contrib.elasticsearch.
> ElasticSearchMapOutputOperator;
> import supplies.facility.edi.helpers.EdiSmtpOutputOperator;
> import supplies.facility.edi.helpers.KinesisEdiTransactionSetInputO
> perator;
> import supplies.facility.edi.helpers.SqsTransactionSetInputOperator;
> import supplies.facility.edi.helpers.StatefulShardManager;
>
> import java.sql.PreparedStatement;
> import java.sql.ResultSet;
>
> @ApplicationAnnotation(name="FsEdiSellars")
> public class Application implements StreamingApplication {
>   private final Locality locality = null;
>
>   private static final Logger LOG = LoggerFactory.getLogger(
> Application.class);
>
>   private JdbcStore jdbcStore;
>
>   @Override
>   public void populateDAG(DAG dag, Configuration conf) {
>
>     try {
>       //Read from SQS FS_EDI_SELLARS
>       //SQSConnectionFactory
>       SqsTransactionSetInputOperator sqsReader = dag.addOperator("TransactionsQueue",
> new SqsTransactionSetInputOperator());
>
>       // Define all the operators
>       EDIRoutingOperator ediRouter = dag.addOperator("ediRouter", new
> EDIRoutingOperator());
>       InvoiceOperator operator810 = dag.addOperator("operator810", new
> InvoiceOperator());
>       POAcknowledgmentOperator operator855 =
> dag.addOperator("operator855", new POAcknowledgmentOperator());
>       ShipNotificationOperator operator856 =
> dag.addOperator("operator856", new ShipNotificationOperator());
>       FunctionalAcknowledgmentOperator operator997 =
> dag.addOperator("operator997", new FunctionalAcknowledgmentOperator());
>
>       // Set up the EDIRoutingOperator that takes the inbound EDI data,
> and puts it into multiple
>       // outbound streams for each of the applicable EDI transactions:
>       //       810 = Invoice
>       //       855 = Purchase Order Acknowledgment
>       //       856 = Automatic Ship Notification(ASN)
>       //       997 = Functional Acknowledgment
>       // as well as one for an elasticsearch logger handler to log all
> incoming transactions.
>       dag.addStream("ediInboundStream", sqsReader.output,
> ediRouter.inboundEdiPort).setLocality(locality.CONTAINER_LOCAL);
>       dag.addStream("loopBack855", operator855.loopbackPort855,
> ediRouter.loopbackPort855).setLocality(locality.CONTAINER_LOCAL);
>       dag.addStream("loopBack856", operator856.loopbackPort856,
> ediRouter.loopbackPort856).setLocality(locality.CONTAINER_LOCAL);
>       dag.addStream("loopBack810", operator810.loopbackPort810,
> ediRouter.loopbackPort810).setLocality(locality.CONTAINER_LOCAL);
>       dag.addStream("loopBack997", operator997.loopbackPort997,
> ediRouter.loopbackPort997).setLocality(locality.CONTAINER_LOCAL);
>
>       // Set up the Invoice operator and tie it to the output stream
> created in the EDIRouter
>       dag.addStream("invoices", ediRouter.outputPort810,
> operator810.InputPort).setLocality(locality);
>
>       // Set up the Purchase Order Acknowledgment operator and tie it to
> the output stream created in the EDIRouter
>       dag.addStream("PoAcknowledgments", ediRouter.outputPort855,
> operator855.InputPort).setLocality(locality);
>
>       // Set up the Functional Acknowledgment operator and tie it to the
> output stream created in the
>       dag.addStream("shipNotifications", ediRouter.outputPort856,
> operator856.InputPort).setLocality(locality);
>
>       // Set up the Functional Acknowledgment operator and tie it to the
> output stream created in the EDIRouter
>       dag.addStream("functionalAcknowledgments", ediRouter.outputPort997,
> operator997.InputPort).setLocality(locality);
>
>       // Set up the elasticsearch operator and tie it to the output stream
> created in the EDIRouter
>       ElasticSearchMapOutputOperator operatorEs =
> dag.addOperator("operatorEs", new ElasticSearchMapOutputOperator());
>       dag.addStream("esLogger", ediRouter.outputPortEs,
> operatorEs.input).setLocality(locality);
>
>       // Set up the smtp output operator to use for the Ship Notificaiton
> messages
>       EdiSmtpOutputOperator operatorSmtp855 = dag.addOperator("operatorSmtp855",
> new EdiSmtpOutputOperator());
>       dag.addStream("smtpOutput855", operator855.outputEmails,
> operatorSmtp855.input).setLocality(locality);
>
>       // Set up the smtp output operator to use for the Ship Notificaiton
> messages
>       EdiSmtpOutputOperator operatorSmtp856 = dag.addOperator("operatorSmtp856",
> new EdiSmtpOutputOperator());
>       dag.addStream("smtpOutput856", operator856.outputEmails,
> operatorSmtp856.input).setLocality(locality);
>
>     } catch (Exception exc) {
>       DTThrowable.rethrow(exc);
>     }
>   }
> }
>
>
> ==============================================  here is one of the
> operator files, stripped of everything but the module declarations
> =============
>
> import com.datatorrent.api.DefaultOutputPort;
> import com.datatorrent.api.annotation.Stateless;
> import org.milyn.payload.StringResult;
> import org.slf4j.Logger;
> import org.slf4j.LoggerFactory;
> import org.w3c.dom.Document;
> import supplies.facility.edi.helpers.AbstractEDIProcessor;
>
> /**
>  * This Operator handles the processing for the EDI 810, Invoice
> transactions.
>  *
>  * As invoices arrive, they are added to the accounting invoicing system
> for payment,
>  * and the order status is updated to show that the item is now closed.
>  *
>  * Created by jradke on 2/7/2016.
>  */
> @Stateless
> public class InvoiceOperator extends AbstractEDIProcessor {
>
>     private static final Logger logger = LoggerFactory.getLogger(
> InvoiceOperator.class);
>
>     public InvoiceOperator() {
>         super("files/smooks-config810.xml");
>     }
>
>     public transient DefaultOutputPort<Long> loopbackPort810= new
> DefaultOutputPort<Long>();
>
>     @Override
>     protected void processXML(Document xmlDocument) {
>         logger.trace("Entered InvoiceOperator processXML");
>     }
> }
>
> Thanks!
>
> Jim
>
> -----Original Message-----
> From: Tushar Gosavi [mailto:tushar@datatorrent.com]
> Sent: Thursday, September 1, 2016 1:06 PM
> To: users@apex.apache.org
> Subject: Re: HDHT question - looking for the datatorrent gurus!
>
> Hi Jim,
>
> Yes this is what I had in mind, The manage state needs to have separate
> input for each of the 5 operators. The platform does not support connecting
> multiple output port to a single input port, but you could achieve similar
> effect using stream merge operator
> (https://github.com/apache/apex-malhar/blob/3ce83708f795b081d564be357a8333
> 928154398e/library/src/main/java/com/datatorrent/lib/
> stream/StreamMerger.java)
>
> - Tushar.
>
>
> On Thu, Sep 1, 2016 at 10:37 PM, Jim <ji...@facility.supplies> wrote:
> > Tushar,
> >
> > Funny that you described it that way, as that is exactly what I was
> thinking about this morning.
> >
> >
> > So the flow would be:
> >
> >
> >
> > Router Operator
> >
> >
> > |
> >
> >
> > Managed State Operator
> >
> >
>                                           |
> >                                 ------------------------------
> ------------------------------------------------------------
> ---------------------------------------------
> >                                |
>                    |
>       |                                                      |
> >
> >          General Acknowledgement             Detailed Acknowledgement
>                    Ship Notification
> Invoice
> >
> >                                |
>                    |
>       |                                                      |
> >                                 ------------------------------
> ------------------------------------------------------------
> ---------------------------------------------
> >
>                                          |
> >                               ------------------------------
> ------------------------------------------------------------
> ------------------------------------------------
> >                            /    each of the 4 operators at the end of
> processing emits a  record back to Managed State Operator      /
> >
> > ----------------------------------------------------------------------
> > --------------------------------------------------------------------
> >
> >
> > In this scenario, would the managed state operator just have 1 input,
> > that all the other operators emit to, or would it need to have separate
> inputs for each of the 5 operators that would be emitting to it?
> >
> > This is what you were describing too, correct?
> >
> > Thanks,
> >
> > Jim
> >
> > -----Original Message-----
> > From: Tushar Gosavi [mailto:tushar@datatorrent.com]
> > Sent: Thursday, September 1, 2016 11:49 AM
> > To: users@apex.apache.org
> > Subject: Re: HDHT question - looking for the datatorrent gurus!
> >
> > Hi Jim,
> >
> > Currently HDHT is accessible only to single operator in a DAG. Single
> HDHT store can not be managed by two different operator at a time which
> could cause metadata corruption. Theoretically HDHT bucket could be read
> from multiple operators, but only one writer is allowed.
> >
> > In your case a stage in transaction is processed completely by different
> operator and then only next stage can start. It could still be achieved by
> using a single operator which manages HDHT state, and having a loop in DAG
> to send completed transaction ids to sequencer.
> >
> > - Sequence operator will emit transaction to transaction processing
> operator.
> > - If it receives an out of order transaction it will note it down in
> HDHT.
> > - The processing operator will send completed transaction id on a port
> which is connected back to sequence operator.
> > - On receiving data on this loopback port, sequence operator will update
> HDHT and search for next transaction in order, which could be stored in
> HDHT and will emit to next processing operator.
> >
> > - Tushar.
> >
> >
> > On Sat, Aug 27, 2016 at 1:31 AM, Jim <ji...@facility.supplies> wrote:
> >> Good afternoon,
> >>
> >>
> >>
> >> I have an apex application where I may receive edi transactions, but
> >> sometimes they arrive out of order and I want to hold any out of
> >> sequence transactions till the correct time in the flow to process them.
> >>
> >>
> >>
> >> For example for a standard order, we will receive from the remote
> vendor:
> >>
> >>
> >>
> >> 1.)    General Acknowledgement
> >>
> >> 2.)    Detailed Acknowledgement
> >>
> >> 3.)    Ship Notification
> >>
> >> 4.)    Invoice
> >>
> >>
> >>
> >> They are supposed to be sent and received in that order.
> >>
> >>
> >>
> >> However sometimes vendors systems have problems, etc. so they send
> >> the all of these at the same time, and then we can receive them out of
> sequence.
> >> Data packets for these are very small, say from 1 to 512 bytes, and
> >> the only time they will be out of sequence, we will receive them very
> >> closely together.
> >>
> >>
> >>
> >> I am trying to think of the best way to do this in my datatorrent /
> >> Hadoop / yarn facilities, instead of creating a datatable in
> >> postgreSQl and using that.
> >>
> >>
> >>
> >> Can I create a flow that works like this (I am not sure if this makes
> >> sense, or is the best way to solve my problem, while keeping state,
> >> etc. maintained for all the operators):
> >>
> >>
> >>
> >> 1.)    In the inbound transaction router, check the hdht store for the
> order
> >> number, if it doesn’t exist, this means it is a new order, if the
> >> transaction trying to process is the general acknowledgment, emit the
> >> data to the general acknowledgement operator; if it is not – store
> >> the transaction data into the correct bucket identifying the
> >> transaction is it for, as well as the next step to be the general
> >> acknowledgement in HDHT by order number.
> >>
> >> 2.)    Say the next transaction is the ship notification, in the
> router, we
> >> would check the HDHT store, see this is not the next expected
> >> transaction (say it is supposed to be the detail acknowledgement), so
> >> we would just post the data for the ship notification into HDHT the
> store and say we are done.
> >>
> >> 3.)    Say we now receive the detailed acknowledgement for an order
> whose
> >> next step IS the detailed acknowledgement, we would see this is the
> >> correct next transaction, emit it to the detailed acknowledgement
> >> operator, and update the HDHT store to show that the next transaction
> >> should be the ship notification.  NOTE:  we can’t emit the ship
> >> notification yet, till we have confirmed that the detailed
> ackkowledgment has been completed.
> >>
> >> 4.)    In each of the 4 transaction operators at the end of the
> processing,
> >> we would update the HDHT store to show the next expected step, and if
> >> we already received data for the next expected step pull it from the
> >> HDHT store, and write the transaction into our SQS queue which is the
> >> input into the inbound transaction router at the beginning of the
> >> application, so it processes through the system.
> >>
> >>
> >>
> >> I believe HDHT can be used to pass data throughout an entire
> >> application, and is not limited to just a per operator basis, correct?
> >>
> >>
> >>
> >> Any comments / feedback?
> >>
> >>
> >>
> >> Thanks,
> >>
> >>
> >>
> >> Jim
>

RE: HDHT question - looking for the datatorrent gurus!

Posted by Jim <ji...@facility.supplies>.
Tushar,

I am trying to implement what you described, and I get a validation error:  Loops In Graph, and can't seem to find any reference to this.  

Below I have pasted my Application.java file; what could be causing this error:

java.lang.RuntimeException: Error creating local cluster

	at com.datatorrent.stram.LocalModeImpl.getController(LocalModeImpl.java:78)
	at supplies.facility.edi.sellars.ApplicationTest.testApplication(ApplicationTest.java:30)
	at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
	at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
	at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
	at java.lang.reflect.Method.invoke(Method.java:498)
	at org.junit.runners.model.FrameworkMethod$1.runReflectiveCall(FrameworkMethod.java:44)
	at org.junit.internal.runners.model.ReflectiveCallable.run(ReflectiveCallable.java:15)
	at org.junit.runners.model.FrameworkMethod.invokeExplosively(FrameworkMethod.java:41)
	at org.junit.internal.runners.statements.InvokeMethod.evaluate(InvokeMethod.java:20)
	at org.junit.runners.BlockJUnit4ClassRunner.runNotIgnored(BlockJUnit4ClassRunner.java:79)
	at org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:71)
	at org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:49)
	at org.junit.runners.ParentRunner$3.run(ParentRunner.java:193)
	at org.junit.runners.ParentRunner$1.schedule(ParentRunner.java:52)
	at org.junit.runners.ParentRunner.runChildren(ParentRunner.java:191)
	at org.junit.runners.ParentRunner.access$000(ParentRunner.java:42)
	at org.junit.runners.ParentRunner$2.evaluate(ParentRunner.java:184)
	at org.junit.runners.ParentRunner.run(ParentRunner.java:236)
	at org.junit.runner.JUnitCore.run(JUnitCore.java:157)
	at com.intellij.junit4.JUnit4IdeaTestRunner.startRunnerWithArgs(JUnit4IdeaTestRunner.java:117)
	at com.intellij.junit4.JUnit4IdeaTestRunner.startRunnerWithArgs(JUnit4IdeaTestRunner.java:42)
	at com.intellij.rt.execution.junit.JUnitStarter.prepareStreamsAndStart(JUnitStarter.java:262)
	at com.intellij.rt.execution.junit.JUnitStarter.main(JUnitStarter.java:84)
Caused by: javax.validation.ValidationException: Loops in graph: [[OperatorMeta{name=operator997, operator=FunctionalAcknowledgmentOperator{name=null}, attributes={Attribute{defaultValue=1024, name=com.datatorrent.api.Context.OperatorContext.MEMORY_MB, codec=com.datatorrent.api.StringCodec$Integer2String@7b7fdc8}=192}}, OperatorMeta{name=operator856, operator=ShipNotificationOperator{name=null}, attributes={Attribute{defaultValue=1024, name=com.datatorrent.api.Context.OperatorContext.MEMORY_MB, codec=com.datatorrent.api.StringCodec$Integer2String@7b7fdc8}=192}}, OperatorMeta{name=operator855, operator=POAcknowledgmentOperator{name=null}, attributes={Attribute{defaultValue=1024, name=com.datatorrent.api.Context.OperatorContext.MEMORY_MB, codec=com.datatorrent.api.StringCodec$Integer2String@7b7fdc8}=192}}, OperatorMeta{name=ediRouter, operator=EDIRoutingOperator{name=null}, attributes={Attribute{defaultValue=1024, name=com.datatorrent.api.Context.OperatorContext.MEMORY_MB, codec=com.datatorrent.api.StringCodec$Integer2String@7b7fdc8}=512}}, OperatorMeta{name=operator810, operator=InvoiceOperator{name=null}, attributes={Attribute{defaultValue=1024, name=com.datatorrent.api.Context.OperatorContext.MEMORY_MB, codec=com.datatorrent.api.StringCodec$Integer2String@7b7fdc8}=192}}]]
	at com.datatorrent.stram.plan.logical.LogicalPlan.validate(LogicalPlan.java:1775)
	at com.datatorrent.stram.StramLocalCluster.<init>(StramLocalCluster.java:278)
	at com.datatorrent.stram.LocalModeImpl.getController(LocalModeImpl.java:76)
	... 23 more

============================================  Application.java starts here ============================================================

/**
 * Put your copyright and license info here.
 */
package supplies.facility.edi.sellars;

import com.datatorrent.api.DAG;
import com.datatorrent.api.DAG.Locality;
import com.datatorrent.api.StreamingApplication;
import com.datatorrent.api.annotation.ApplicationAnnotation;
import com.datatorrent.contrib.kinesis.AbstractKinesisInputOperator;
import com.datatorrent.contrib.kinesis.KinesisConsumer;
import com.datatorrent.contrib.kinesis.KinesisStringInputOperator;
import com.datatorrent.contrib.kinesis.ShardManager;
import com.datatorrent.lib.db.jdbc.JdbcStore;
import com.datatorrent.netlet.util.DTThrowable;
import org.apache.hadoop.conf.Configuration;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import supplies.facility.apex.contrib.elasticsearch.ElasticSearchMapOutputOperator;
import supplies.facility.edi.helpers.EdiSmtpOutputOperator;
import supplies.facility.edi.helpers.KinesisEdiTransactionSetInputOperator;
import supplies.facility.edi.helpers.SqsTransactionSetInputOperator;
import supplies.facility.edi.helpers.StatefulShardManager;

import java.sql.PreparedStatement;
import java.sql.ResultSet;

@ApplicationAnnotation(name="FsEdiSellars")
public class Application implements StreamingApplication {
  private final Locality locality = null;

  private static final Logger LOG = LoggerFactory.getLogger(Application.class);

  private JdbcStore jdbcStore;

  @Override
  public void populateDAG(DAG dag, Configuration conf) {

    try {
      //Read from SQS FS_EDI_SELLARS
      //SQSConnectionFactory
      SqsTransactionSetInputOperator sqsReader = dag.addOperator("TransactionsQueue", new SqsTransactionSetInputOperator());

      // Define all the operators
      EDIRoutingOperator ediRouter = dag.addOperator("ediRouter", new EDIRoutingOperator());
      InvoiceOperator operator810 = dag.addOperator("operator810", new InvoiceOperator());
      POAcknowledgmentOperator operator855 = dag.addOperator("operator855", new POAcknowledgmentOperator());
      ShipNotificationOperator operator856 = dag.addOperator("operator856", new ShipNotificationOperator());
      FunctionalAcknowledgmentOperator operator997 = dag.addOperator("operator997", new FunctionalAcknowledgmentOperator());

      // Set up the EDIRoutingOperator that takes the inbound EDI data, and puts it into multiple
      // outbound streams for each of the applicable EDI transactions:
      //       810 = Invoice
      //       855 = Purchase Order Acknowledgment
      //       856 = Automatic Ship Notification(ASN)
      //       997 = Functional Acknowledgment
      // as well as one for an elasticsearch logger handler to log all incoming transactions.
      dag.addStream("ediInboundStream", sqsReader.output, ediRouter.inboundEdiPort).setLocality(locality.CONTAINER_LOCAL);
      dag.addStream("loopBack855", operator855.loopbackPort855, ediRouter.loopbackPort855).setLocality(locality.CONTAINER_LOCAL);
      dag.addStream("loopBack856", operator856.loopbackPort856, ediRouter.loopbackPort856).setLocality(locality.CONTAINER_LOCAL);
      dag.addStream("loopBack810", operator810.loopbackPort810, ediRouter.loopbackPort810).setLocality(locality.CONTAINER_LOCAL);
      dag.addStream("loopBack997", operator997.loopbackPort997, ediRouter.loopbackPort997).setLocality(locality.CONTAINER_LOCAL);

      // Set up the Invoice operator and tie it to the output stream created in the EDIRouter
      dag.addStream("invoices", ediRouter.outputPort810, operator810.InputPort).setLocality(locality);

      // Set up the Purchase Order Acknowledgment operator and tie it to the output stream created in the EDIRouter
      dag.addStream("PoAcknowledgments", ediRouter.outputPort855, operator855.InputPort).setLocality(locality);

      // Set up the Functional Acknowledgment operator and tie it to the output stream created in the
      dag.addStream("shipNotifications", ediRouter.outputPort856, operator856.InputPort).setLocality(locality);

      // Set up the Functional Acknowledgment operator and tie it to the output stream created in the EDIRouter
      dag.addStream("functionalAcknowledgments", ediRouter.outputPort997, operator997.InputPort).setLocality(locality);

      // Set up the elasticsearch operator and tie it to the output stream created in the EDIRouter
      ElasticSearchMapOutputOperator operatorEs = dag.addOperator("operatorEs", new ElasticSearchMapOutputOperator());
      dag.addStream("esLogger", ediRouter.outputPortEs, operatorEs.input).setLocality(locality);

      // Set up the smtp output operator to use for the Ship Notificaiton messages
      EdiSmtpOutputOperator operatorSmtp855 = dag.addOperator("operatorSmtp855", new EdiSmtpOutputOperator());
      dag.addStream("smtpOutput855", operator855.outputEmails, operatorSmtp855.input).setLocality(locality);

      // Set up the smtp output operator to use for the Ship Notificaiton messages
      EdiSmtpOutputOperator operatorSmtp856 = dag.addOperator("operatorSmtp856", new EdiSmtpOutputOperator());
      dag.addStream("smtpOutput856", operator856.outputEmails, operatorSmtp856.input).setLocality(locality);

    } catch (Exception exc) {
      DTThrowable.rethrow(exc);
    }
  }
}


==============================================  here is one of the operator files, stripped of everything but the module declarations =============

import com.datatorrent.api.DefaultOutputPort;
import com.datatorrent.api.annotation.Stateless;
import org.milyn.payload.StringResult;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.w3c.dom.Document;
import supplies.facility.edi.helpers.AbstractEDIProcessor;

/**
 * This Operator handles the processing for the EDI 810, Invoice transactions.
 *
 * As invoices arrive, they are added to the accounting invoicing system for payment,
 * and the order status is updated to show that the item is now closed.
 *
 * Created by jradke on 2/7/2016.
 */
@Stateless
public class InvoiceOperator extends AbstractEDIProcessor {

    private static final Logger logger = LoggerFactory.getLogger(InvoiceOperator.class);

    public InvoiceOperator() {
        super("files/smooks-config810.xml");
    }

    public transient DefaultOutputPort<Long> loopbackPort810= new DefaultOutputPort<Long>();

    @Override
    protected void processXML(Document xmlDocument) {
        logger.trace("Entered InvoiceOperator processXML");
    }
}

Thanks!

Jim

-----Original Message-----
From: Tushar Gosavi [mailto:tushar@datatorrent.com] 
Sent: Thursday, September 1, 2016 1:06 PM
To: users@apex.apache.org
Subject: Re: HDHT question - looking for the datatorrent gurus!

Hi Jim,

Yes this is what I had in mind, The manage state needs to have separate input for each of the 5 operators. The platform does not support connecting multiple output port to a single input port, but you could achieve similar effect using stream merge operator
(https://github.com/apache/apex-malhar/blob/3ce83708f795b081d564be357a8333928154398e/library/src/main/java/com/datatorrent/lib/stream/StreamMerger.java)

- Tushar.


On Thu, Sep 1, 2016 at 10:37 PM, Jim <ji...@facility.supplies> wrote:
> Tushar,
>
> Funny that you described it that way, as that is exactly what I was thinking about this morning.
>
>
> So the flow would be:
>
>
>                                                                                                       
> Router Operator
>
>                                                                                                                    
> |
>
>                                                                                                
> Managed State Operator
>
>                                                                                                                    |
>                                 ---------------------------------------------------------------------------------------------------------------------------------------
>                                |                                                           |                                                              |                                                      |
>
>          General Acknowledgement             Detailed Acknowledgement                       Ship Notification                                  Invoice
>
>                                |                                                           |                                                              |                                                      |
>                                 ---------------------------------------------------------------------------------------------------------------------------------------
>                                                                                                                   |
>                               ------------------------------------------------------------------------------------------------------------------------------------------
>                            /    each of the 4 operators at the end of processing emits a  record back to Managed State Operator      /
>                           
> ----------------------------------------------------------------------
> --------------------------------------------------------------------
>
>
> In this scenario, would the managed state operator just have 1 input, 
> that all the other operators emit to, or would it need to have separate inputs for each of the 5 operators that would be emitting to it?
>
> This is what you were describing too, correct?
>
> Thanks,
>
> Jim
>
> -----Original Message-----
> From: Tushar Gosavi [mailto:tushar@datatorrent.com]
> Sent: Thursday, September 1, 2016 11:49 AM
> To: users@apex.apache.org
> Subject: Re: HDHT question - looking for the datatorrent gurus!
>
> Hi Jim,
>
> Currently HDHT is accessible only to single operator in a DAG. Single HDHT store can not be managed by two different operator at a time which could cause metadata corruption. Theoretically HDHT bucket could be read from multiple operators, but only one writer is allowed.
>
> In your case a stage in transaction is processed completely by different operator and then only next stage can start. It could still be achieved by using a single operator which manages HDHT state, and having a loop in DAG to send completed transaction ids to sequencer.
>
> - Sequence operator will emit transaction to transaction processing operator.
> - If it receives an out of order transaction it will note it down in HDHT.
> - The processing operator will send completed transaction id on a port which is connected back to sequence operator.
> - On receiving data on this loopback port, sequence operator will update HDHT and search for next transaction in order, which could be stored in HDHT and will emit to next processing operator.
>
> - Tushar.
>
>
> On Sat, Aug 27, 2016 at 1:31 AM, Jim <ji...@facility.supplies> wrote:
>> Good afternoon,
>>
>>
>>
>> I have an apex application where I may receive edi transactions, but 
>> sometimes they arrive out of order and I want to hold any out of 
>> sequence transactions till the correct time in the flow to process them.
>>
>>
>>
>> For example for a standard order, we will receive from the remote vendor:
>>
>>
>>
>> 1.)    General Acknowledgement
>>
>> 2.)    Detailed Acknowledgement
>>
>> 3.)    Ship Notification
>>
>> 4.)    Invoice
>>
>>
>>
>> They are supposed to be sent and received in that order.
>>
>>
>>
>> However sometimes vendors systems have problems, etc. so they send 
>> the all of these at the same time, and then we can receive them out of sequence.
>> Data packets for these are very small, say from 1 to 512 bytes, and 
>> the only time they will be out of sequence, we will receive them very 
>> closely together.
>>
>>
>>
>> I am trying to think of the best way to do this in my datatorrent / 
>> Hadoop / yarn facilities, instead of creating a datatable in 
>> postgreSQl and using that.
>>
>>
>>
>> Can I create a flow that works like this (I am not sure if this makes 
>> sense, or is the best way to solve my problem, while keeping state, 
>> etc. maintained for all the operators):
>>
>>
>>
>> 1.)    In the inbound transaction router, check the hdht store for the order
>> number, if it doesn’t exist, this means it is a new order, if the 
>> transaction trying to process is the general acknowledgment, emit the 
>> data to the general acknowledgement operator; if it is not – store 
>> the transaction data into the correct bucket identifying the 
>> transaction is it for, as well as the next step to be the general 
>> acknowledgement in HDHT by order number.
>>
>> 2.)    Say the next transaction is the ship notification, in the router, we
>> would check the HDHT store, see this is not the next expected 
>> transaction (say it is supposed to be the detail acknowledgement), so 
>> we would just post the data for the ship notification into HDHT the store and say we are done.
>>
>> 3.)    Say we now receive the detailed acknowledgement for an order whose
>> next step IS the detailed acknowledgement, we would see this is the 
>> correct next transaction, emit it to the detailed acknowledgement 
>> operator, and update the HDHT store to show that the next transaction 
>> should be the ship notification.  NOTE:  we can’t emit the ship 
>> notification yet, till we have confirmed that the detailed ackkowledgment has been completed.
>>
>> 4.)    In each of the 4 transaction operators at the end of the processing,
>> we would update the HDHT store to show the next expected step, and if 
>> we already received data for the next expected step pull it from the 
>> HDHT store, and write the transaction into our SQS queue which is the 
>> input into the inbound transaction router at the beginning of the 
>> application, so it processes through the system.
>>
>>
>>
>> I believe HDHT can be used to pass data throughout an entire 
>> application, and is not limited to just a per operator basis, correct?
>>
>>
>>
>> Any comments / feedback?
>>
>>
>>
>> Thanks,
>>
>>
>>
>> Jim

Re: HDHT question - looking for the datatorrent gurus!

Posted by Thomas Weise <th...@gmail.com>.
The user defines how to convert key and value into byte[], so you can use
any serialization mechanism you like (custom, Kryo, JSON, etc.).

Here is an example for setting up the the serializer:

https://github.com/DataTorrent/examples/blob/master/tutorials/hdht/src/test/java/com/example/HDHTTestOperator.java

You would replace KeyValPair<byte[], byte[]> with your complex type.

Thanks,
Thomas

On Thu, Sep 1, 2016 at 11:14 AM, Jim <ji...@facility.supplies> wrote:

> Tushar,
>
> Great!  My final question, I have been searching, and haven't found any
> good examples of setting up a complex data type, and storing and retrieving
> it from HDHT within an operator.  My data that would want to store would
> look like:
>
> Field Name                    Field Type
>
> OrderNumber               integer (this is the key)
> Status                             integer
> 855Data                         string
> 856Data                         string
> 910Data                         string
>
> Any examples that you can point me to show me the best way to set up
> accessing the HDHT?
>
> Thanks,
>
> Jim
>
> -----Original Message-----
> From: Tushar Gosavi [mailto:tushar@datatorrent.com]
> Sent: Thursday, September 1, 2016 1:06 PM
> To: users@apex.apache.org
> Subject: Re: HDHT question - looking for the datatorrent gurus!
>
> Hi Jim,
>
> Yes this is what I had in mind, The manage state needs to have separate
> input for each of the 5 operators. The platform does not support connecting
> multiple output port to a single input port, but you could achieve similar
> effect using stream merge operator
> (https://github.com/apache/apex-malhar/blob/3ce83708f795b081d564be357a8333
> 928154398e/library/src/main/java/com/datatorrent/lib/
> stream/StreamMerger.java)
>
> - Tushar.
>
>
> On Thu, Sep 1, 2016 at 10:37 PM, Jim <ji...@facility.supplies> wrote:
> > Tushar,
> >
> > Funny that you described it that way, as that is exactly what I was
> thinking about this morning.
> >
> >
> > So the flow would be:
> >
> >
> >
> > Router Operator
> >
> >
> > |
> >
> >
> > Managed State Operator
> >
> >
>                                           |
> >                                 ------------------------------
> ------------------------------------------------------------
> ---------------------------------------------
> >                                |
>                    |
>       |                                                      |
> >
> >          General Acknowledgement             Detailed Acknowledgement
>                    Ship Notification
> Invoice
> >
> >                                |
>                    |
>       |                                                      |
> >                                 ------------------------------
> ------------------------------------------------------------
> ---------------------------------------------
> >
>                                          |
> >                               ------------------------------
> ------------------------------------------------------------
> ------------------------------------------------
> >                            /    each of the 4 operators at the end of
> processing emits a  record back to Managed State Operator      /
> >
> > ----------------------------------------------------------------------
> > --------------------------------------------------------------------
> >
> >
> > In this scenario, would the managed state operator just have 1 input,
> > that all the other operators emit to, or would it need to have separate
> inputs for each of the 5 operators that would be emitting to it?
> >
> > This is what you were describing too, correct?
> >
> > Thanks,
> >
> > Jim
> >
> > -----Original Message-----
> > From: Tushar Gosavi [mailto:tushar@datatorrent.com]
> > Sent: Thursday, September 1, 2016 11:49 AM
> > To: users@apex.apache.org
> > Subject: Re: HDHT question - looking for the datatorrent gurus!
> >
> > Hi Jim,
> >
> > Currently HDHT is accessible only to single operator in a DAG. Single
> HDHT store can not be managed by two different operator at a time which
> could cause metadata corruption. Theoretically HDHT bucket could be read
> from multiple operators, but only one writer is allowed.
> >
> > In your case a stage in transaction is processed completely by different
> operator and then only next stage can start. It could still be achieved by
> using a single operator which manages HDHT state, and having a loop in DAG
> to send completed transaction ids to sequencer.
> >
> > - Sequence operator will emit transaction to transaction processing
> operator.
> > - If it receives an out of order transaction it will note it down in
> HDHT.
> > - The processing operator will send completed transaction id on a port
> which is connected back to sequence operator.
> > - On receiving data on this loopback port, sequence operator will update
> HDHT and search for next transaction in order, which could be stored in
> HDHT and will emit to next processing operator.
> >
> > - Tushar.
> >
> >
> > On Sat, Aug 27, 2016 at 1:31 AM, Jim <ji...@facility.supplies> wrote:
> >> Good afternoon,
> >>
> >>
> >>
> >> I have an apex application where I may receive edi transactions, but
> >> sometimes they arrive out of order and I want to hold any out of
> >> sequence transactions till the correct time in the flow to process them.
> >>
> >>
> >>
> >> For example for a standard order, we will receive from the remote
> vendor:
> >>
> >>
> >>
> >> 1.)    General Acknowledgement
> >>
> >> 2.)    Detailed Acknowledgement
> >>
> >> 3.)    Ship Notification
> >>
> >> 4.)    Invoice
> >>
> >>
> >>
> >> They are supposed to be sent and received in that order.
> >>
> >>
> >>
> >> However sometimes vendors systems have problems, etc. so they send
> >> the all of these at the same time, and then we can receive them out of
> sequence.
> >> Data packets for these are very small, say from 1 to 512 bytes, and
> >> the only time they will be out of sequence, we will receive them very
> >> closely together.
> >>
> >>
> >>
> >> I am trying to think of the best way to do this in my datatorrent /
> >> Hadoop / yarn facilities, instead of creating a datatable in
> >> postgreSQl and using that.
> >>
> >>
> >>
> >> Can I create a flow that works like this (I am not sure if this makes
> >> sense, or is the best way to solve my problem, while keeping state,
> >> etc. maintained for all the operators):
> >>
> >>
> >>
> >> 1.)    In the inbound transaction router, check the hdht store for the
> order
> >> number, if it doesn’t exist, this means it is a new order, if the
> >> transaction trying to process is the general acknowledgment, emit the
> >> data to the general acknowledgement operator; if it is not – store
> >> the transaction data into the correct bucket identifying the
> >> transaction is it for, as well as the next step to be the general
> >> acknowledgement in HDHT by order number.
> >>
> >> 2.)    Say the next transaction is the ship notification, in the
> router, we
> >> would check the HDHT store, see this is not the next expected
> >> transaction (say it is supposed to be the detail acknowledgement), so
> >> we would just post the data for the ship notification into HDHT the
> store and say we are done.
> >>
> >> 3.)    Say we now receive the detailed acknowledgement for an order
> whose
> >> next step IS the detailed acknowledgement, we would see this is the
> >> correct next transaction, emit it to the detailed acknowledgement
> >> operator, and update the HDHT store to show that the next transaction
> >> should be the ship notification.  NOTE:  we can’t emit the ship
> >> notification yet, till we have confirmed that the detailed
> ackkowledgment has been completed.
> >>
> >> 4.)    In each of the 4 transaction operators at the end of the
> processing,
> >> we would update the HDHT store to show the next expected step, and if
> >> we already received data for the next expected step pull it from the
> >> HDHT store, and write the transaction into our SQS queue which is the
> >> input into the inbound transaction router at the beginning of the
> >> application, so it processes through the system.
> >>
> >>
> >>
> >> I believe HDHT can be used to pass data throughout an entire
> >> application, and is not limited to just a per operator basis, correct?
> >>
> >>
> >>
> >> Any comments / feedback?
> >>
> >>
> >>
> >> Thanks,
> >>
> >>
> >>
> >> Jim
>

RE: HDHT question - looking for the datatorrent gurus!

Posted by Jim <ji...@facility.supplies>.
Tushar,

Great!  My final question, I have been searching, and haven't found any good examples of setting up a complex data type, and storing and retrieving it from HDHT within an operator.  My data that would want to store would look like:

Field Name                    Field Type

OrderNumber               integer (this is the key)
Status                             integer
855Data                         string
856Data                         string
910Data                         string

Any examples that you can point me to show me the best way to set up accessing the HDHT?

Thanks,

Jim

-----Original Message-----
From: Tushar Gosavi [mailto:tushar@datatorrent.com] 
Sent: Thursday, September 1, 2016 1:06 PM
To: users@apex.apache.org
Subject: Re: HDHT question - looking for the datatorrent gurus!

Hi Jim,

Yes this is what I had in mind, The manage state needs to have separate input for each of the 5 operators. The platform does not support connecting multiple output port to a single input port, but you could achieve similar effect using stream merge operator
(https://github.com/apache/apex-malhar/blob/3ce83708f795b081d564be357a8333928154398e/library/src/main/java/com/datatorrent/lib/stream/StreamMerger.java)

- Tushar.


On Thu, Sep 1, 2016 at 10:37 PM, Jim <ji...@facility.supplies> wrote:
> Tushar,
>
> Funny that you described it that way, as that is exactly what I was thinking about this morning.
>
>
> So the flow would be:
>
>
>                                                                                                       
> Router Operator
>
>                                                                                                                    
> |
>
>                                                                                                
> Managed State Operator
>
>                                                                                                                    |
>                                 ---------------------------------------------------------------------------------------------------------------------------------------
>                                |                                                           |                                                              |                                                      |
>
>          General Acknowledgement             Detailed Acknowledgement                       Ship Notification                                  Invoice
>
>                                |                                                           |                                                              |                                                      |
>                                 ---------------------------------------------------------------------------------------------------------------------------------------
>                                                                                                                   |
>                               ------------------------------------------------------------------------------------------------------------------------------------------
>                            /    each of the 4 operators at the end of processing emits a  record back to Managed State Operator      /
>                           
> ----------------------------------------------------------------------
> --------------------------------------------------------------------
>
>
> In this scenario, would the managed state operator just have 1 input, 
> that all the other operators emit to, or would it need to have separate inputs for each of the 5 operators that would be emitting to it?
>
> This is what you were describing too, correct?
>
> Thanks,
>
> Jim
>
> -----Original Message-----
> From: Tushar Gosavi [mailto:tushar@datatorrent.com]
> Sent: Thursday, September 1, 2016 11:49 AM
> To: users@apex.apache.org
> Subject: Re: HDHT question - looking for the datatorrent gurus!
>
> Hi Jim,
>
> Currently HDHT is accessible only to single operator in a DAG. Single HDHT store can not be managed by two different operator at a time which could cause metadata corruption. Theoretically HDHT bucket could be read from multiple operators, but only one writer is allowed.
>
> In your case a stage in transaction is processed completely by different operator and then only next stage can start. It could still be achieved by using a single operator which manages HDHT state, and having a loop in DAG to send completed transaction ids to sequencer.
>
> - Sequence operator will emit transaction to transaction processing operator.
> - If it receives an out of order transaction it will note it down in HDHT.
> - The processing operator will send completed transaction id on a port which is connected back to sequence operator.
> - On receiving data on this loopback port, sequence operator will update HDHT and search for next transaction in order, which could be stored in HDHT and will emit to next processing operator.
>
> - Tushar.
>
>
> On Sat, Aug 27, 2016 at 1:31 AM, Jim <ji...@facility.supplies> wrote:
>> Good afternoon,
>>
>>
>>
>> I have an apex application where I may receive edi transactions, but 
>> sometimes they arrive out of order and I want to hold any out of 
>> sequence transactions till the correct time in the flow to process them.
>>
>>
>>
>> For example for a standard order, we will receive from the remote vendor:
>>
>>
>>
>> 1.)    General Acknowledgement
>>
>> 2.)    Detailed Acknowledgement
>>
>> 3.)    Ship Notification
>>
>> 4.)    Invoice
>>
>>
>>
>> They are supposed to be sent and received in that order.
>>
>>
>>
>> However sometimes vendors systems have problems, etc. so they send 
>> the all of these at the same time, and then we can receive them out of sequence.
>> Data packets for these are very small, say from 1 to 512 bytes, and 
>> the only time they will be out of sequence, we will receive them very 
>> closely together.
>>
>>
>>
>> I am trying to think of the best way to do this in my datatorrent / 
>> Hadoop / yarn facilities, instead of creating a datatable in 
>> postgreSQl and using that.
>>
>>
>>
>> Can I create a flow that works like this (I am not sure if this makes 
>> sense, or is the best way to solve my problem, while keeping state, 
>> etc. maintained for all the operators):
>>
>>
>>
>> 1.)    In the inbound transaction router, check the hdht store for the order
>> number, if it doesn’t exist, this means it is a new order, if the 
>> transaction trying to process is the general acknowledgment, emit the 
>> data to the general acknowledgement operator; if it is not – store 
>> the transaction data into the correct bucket identifying the 
>> transaction is it for, as well as the next step to be the general 
>> acknowledgement in HDHT by order number.
>>
>> 2.)    Say the next transaction is the ship notification, in the router, we
>> would check the HDHT store, see this is not the next expected 
>> transaction (say it is supposed to be the detail acknowledgement), so 
>> we would just post the data for the ship notification into HDHT the store and say we are done.
>>
>> 3.)    Say we now receive the detailed acknowledgement for an order whose
>> next step IS the detailed acknowledgement, we would see this is the 
>> correct next transaction, emit it to the detailed acknowledgement 
>> operator, and update the HDHT store to show that the next transaction 
>> should be the ship notification.  NOTE:  we can’t emit the ship 
>> notification yet, till we have confirmed that the detailed ackkowledgment has been completed.
>>
>> 4.)    In each of the 4 transaction operators at the end of the processing,
>> we would update the HDHT store to show the next expected step, and if 
>> we already received data for the next expected step pull it from the 
>> HDHT store, and write the transaction into our SQS queue which is the 
>> input into the inbound transaction router at the beginning of the 
>> application, so it processes through the system.
>>
>>
>>
>> I believe HDHT can be used to pass data throughout an entire 
>> application, and is not limited to just a per operator basis, correct?
>>
>>
>>
>> Any comments / feedback?
>>
>>
>>
>> Thanks,
>>
>>
>>
>> Jim

Re: HDHT question - looking for the datatorrent gurus!

Posted by Tushar Gosavi <tu...@datatorrent.com>.
Hi Jim,

Yes this is what I had in mind, The manage state needs to have
separate input for each of the 5 operators. The platform does not
support connecting multiple output port to a single input port, but
you could achieve similar effect using stream merge operator
(https://github.com/apache/apex-malhar/blob/3ce83708f795b081d564be357a8333928154398e/library/src/main/java/com/datatorrent/lib/stream/StreamMerger.java)

- Tushar.


On Thu, Sep 1, 2016 at 10:37 PM, Jim <ji...@facility.supplies> wrote:
> Tushar,
>
> Funny that you described it that way, as that is exactly what I was thinking about this morning.
>
>
> So the flow would be:
>
>
>                                                                                                       Router Operator
>
>                                                                                                                    |
>
>                                                                                                Managed State Operator
>
>                                                                                                                    |
>                                 ---------------------------------------------------------------------------------------------------------------------------------------
>                                |                                                           |                                                              |                                                      |
>
>          General Acknowledgement             Detailed Acknowledgement                       Ship Notification                                  Invoice
>
>                                |                                                           |                                                              |                                                      |
>                                 ---------------------------------------------------------------------------------------------------------------------------------------
>                                                                                                                   |
>                               ------------------------------------------------------------------------------------------------------------------------------------------
>                            /    each of the 4 operators at the end of processing emits a  record back to Managed State Operator      /
>                           ------------------------------------------------------------------------------------------------------------------------------------------
>
>
> In this scenario, would the managed state operator just have 1 input, that all the other operators emit to, or would it need to have separate inputs for each
> of the 5 operators that would be emitting to it?
>
> This is what you were describing too, correct?
>
> Thanks,
>
> Jim
>
> -----Original Message-----
> From: Tushar Gosavi [mailto:tushar@datatorrent.com]
> Sent: Thursday, September 1, 2016 11:49 AM
> To: users@apex.apache.org
> Subject: Re: HDHT question - looking for the datatorrent gurus!
>
> Hi Jim,
>
> Currently HDHT is accessible only to single operator in a DAG. Single HDHT store can not be managed by two different operator at a time which could cause metadata corruption. Theoretically HDHT bucket could be read from multiple operators, but only one writer is allowed.
>
> In your case a stage in transaction is processed completely by different operator and then only next stage can start. It could still be achieved by using a single operator which manages HDHT state, and having a loop in DAG to send completed transaction ids to sequencer.
>
> - Sequence operator will emit transaction to transaction processing operator.
> - If it receives an out of order transaction it will note it down in HDHT.
> - The processing operator will send completed transaction id on a port which is connected back to sequence operator.
> - On receiving data on this loopback port, sequence operator will update HDHT and search for next transaction in order, which could be stored in HDHT and will emit to next processing operator.
>
> - Tushar.
>
>
> On Sat, Aug 27, 2016 at 1:31 AM, Jim <ji...@facility.supplies> wrote:
>> Good afternoon,
>>
>>
>>
>> I have an apex application where I may receive edi transactions, but
>> sometimes they arrive out of order and I want to hold any out of
>> sequence transactions till the correct time in the flow to process them.
>>
>>
>>
>> For example for a standard order, we will receive from the remote vendor:
>>
>>
>>
>> 1.)    General Acknowledgement
>>
>> 2.)    Detailed Acknowledgement
>>
>> 3.)    Ship Notification
>>
>> 4.)    Invoice
>>
>>
>>
>> They are supposed to be sent and received in that order.
>>
>>
>>
>> However sometimes vendors systems have problems, etc. so they send the
>> all of these at the same time, and then we can receive them out of sequence.
>> Data packets for these are very small, say from 1 to 512 bytes, and
>> the only time they will be out of sequence, we will receive them very
>> closely together.
>>
>>
>>
>> I am trying to think of the best way to do this in my datatorrent /
>> Hadoop / yarn facilities, instead of creating a datatable in
>> postgreSQl and using that.
>>
>>
>>
>> Can I create a flow that works like this (I am not sure if this makes
>> sense, or is the best way to solve my problem, while keeping state,
>> etc. maintained for all the operators):
>>
>>
>>
>> 1.)    In the inbound transaction router, check the hdht store for the order
>> number, if it doesn’t exist, this means it is a new order, if the
>> transaction trying to process is the general acknowledgment, emit the
>> data to the general acknowledgement operator; if it is not – store the
>> transaction data into the correct bucket identifying the transaction
>> is it for, as well as the next step to be the general acknowledgement
>> in HDHT by order number.
>>
>> 2.)    Say the next transaction is the ship notification, in the router, we
>> would check the HDHT store, see this is not the next expected
>> transaction (say it is supposed to be the detail acknowledgement), so
>> we would just post the data for the ship notification into HDHT the store and say we are done.
>>
>> 3.)    Say we now receive the detailed acknowledgement for an order whose
>> next step IS the detailed acknowledgement, we would see this is the
>> correct next transaction, emit it to the detailed acknowledgement
>> operator, and update the HDHT store to show that the next transaction
>> should be the ship notification.  NOTE:  we can’t emit the ship
>> notification yet, till we have confirmed that the detailed ackkowledgment has been completed.
>>
>> 4.)    In each of the 4 transaction operators at the end of the processing,
>> we would update the HDHT store to show the next expected step, and if
>> we already received data for the next expected step pull it from the
>> HDHT store, and write the transaction into our SQS queue which is the
>> input into the inbound transaction router at the beginning of the
>> application, so it processes through the system.
>>
>>
>>
>> I believe HDHT can be used to pass data throughout an entire
>> application, and is not limited to just a per operator basis, correct?
>>
>>
>>
>> Any comments / feedback?
>>
>>
>>
>> Thanks,
>>
>>
>>
>> Jim

RE: HDHT question - looking for the datatorrent gurus!

Posted by Jim <ji...@facility.supplies>.
Tushar,

Funny that you described it that way, as that is exactly what I was thinking about this morning.


So the flow would be:


                                                                                                      Router Operator

                                                                                                                   |

                                                                                               Managed State Operator

                                                                                                                   |
                                ---------------------------------------------------------------------------------------------------------------------------------------
                               |                                                           |                                                              |                                                      |

         General Acknowledgement             Detailed Acknowledgement                       Ship Notification                                  Invoice

                               |                                                           |                                                              |                                                      |
                                ---------------------------------------------------------------------------------------------------------------------------------------
                                                                                                                  |
                              ------------------------------------------------------------------------------------------------------------------------------------------
                           /    each of the 4 operators at the end of processing emits a  record back to Managed State Operator      /
                          ------------------------------------------------------------------------------------------------------------------------------------------


In this scenario, would the managed state operator just have 1 input, that all the other operators emit to, or would it need to have separate inputs for each
of the 5 operators that would be emitting to it?

This is what you were describing too, correct?

Thanks,

Jim

-----Original Message-----
From: Tushar Gosavi [mailto:tushar@datatorrent.com] 
Sent: Thursday, September 1, 2016 11:49 AM
To: users@apex.apache.org
Subject: Re: HDHT question - looking for the datatorrent gurus!

Hi Jim,

Currently HDHT is accessible only to single operator in a DAG. Single HDHT store can not be managed by two different operator at a time which could cause metadata corruption. Theoretically HDHT bucket could be read from multiple operators, but only one writer is allowed.

In your case a stage in transaction is processed completely by different operator and then only next stage can start. It could still be achieved by using a single operator which manages HDHT state, and having a loop in DAG to send completed transaction ids to sequencer.

- Sequence operator will emit transaction to transaction processing operator.
- If it receives an out of order transaction it will note it down in HDHT.
- The processing operator will send completed transaction id on a port which is connected back to sequence operator.
- On receiving data on this loopback port, sequence operator will update HDHT and search for next transaction in order, which could be stored in HDHT and will emit to next processing operator.

- Tushar.


On Sat, Aug 27, 2016 at 1:31 AM, Jim <ji...@facility.supplies> wrote:
> Good afternoon,
>
>
>
> I have an apex application where I may receive edi transactions, but 
> sometimes they arrive out of order and I want to hold any out of 
> sequence transactions till the correct time in the flow to process them.
>
>
>
> For example for a standard order, we will receive from the remote vendor:
>
>
>
> 1.)    General Acknowledgement
>
> 2.)    Detailed Acknowledgement
>
> 3.)    Ship Notification
>
> 4.)    Invoice
>
>
>
> They are supposed to be sent and received in that order.
>
>
>
> However sometimes vendors systems have problems, etc. so they send the 
> all of these at the same time, and then we can receive them out of sequence.
> Data packets for these are very small, say from 1 to 512 bytes, and 
> the only time they will be out of sequence, we will receive them very 
> closely together.
>
>
>
> I am trying to think of the best way to do this in my datatorrent / 
> Hadoop / yarn facilities, instead of creating a datatable in 
> postgreSQl and using that.
>
>
>
> Can I create a flow that works like this (I am not sure if this makes 
> sense, or is the best way to solve my problem, while keeping state, 
> etc. maintained for all the operators):
>
>
>
> 1.)    In the inbound transaction router, check the hdht store for the order
> number, if it doesn’t exist, this means it is a new order, if the 
> transaction trying to process is the general acknowledgment, emit the 
> data to the general acknowledgement operator; if it is not – store the 
> transaction data into the correct bucket identifying the transaction 
> is it for, as well as the next step to be the general acknowledgement 
> in HDHT by order number.
>
> 2.)    Say the next transaction is the ship notification, in the router, we
> would check the HDHT store, see this is not the next expected 
> transaction (say it is supposed to be the detail acknowledgement), so 
> we would just post the data for the ship notification into HDHT the store and say we are done.
>
> 3.)    Say we now receive the detailed acknowledgement for an order whose
> next step IS the detailed acknowledgement, we would see this is the 
> correct next transaction, emit it to the detailed acknowledgement 
> operator, and update the HDHT store to show that the next transaction 
> should be the ship notification.  NOTE:  we can’t emit the ship 
> notification yet, till we have confirmed that the detailed ackkowledgment has been completed.
>
> 4.)    In each of the 4 transaction operators at the end of the processing,
> we would update the HDHT store to show the next expected step, and if 
> we already received data for the next expected step pull it from the 
> HDHT store, and write the transaction into our SQS queue which is the 
> input into the inbound transaction router at the beginning of the 
> application, so it processes through the system.
>
>
>
> I believe HDHT can be used to pass data throughout an entire 
> application, and is not limited to just a per operator basis, correct?
>
>
>
> Any comments / feedback?
>
>
>
> Thanks,
>
>
>
> Jim

Re: HDHT question - looking for the datatorrent gurus!

Posted by Tushar Gosavi <tu...@datatorrent.com>.
Hi Jim,

Currently HDHT is accessible only to single operator in a DAG. Single
HDHT store can not be managed by two different operator at a time
which could cause metadata corruption. Theoretically HDHT bucket could
be read from multiple operators, but only one writer is allowed.

In your case a stage in transaction is processed completely by
different operator and then only next stage can start. It could still
be achieved by using a single operator which manages HDHT state, and
having a loop in DAG to send completed transaction ids to sequencer.

- Sequence operator will emit transaction to transaction processing operator.
- If it receives an out of order transaction it will note it down in HDHT.
- The processing operator will send completed transaction id on a port
which is connected back to sequence operator.
- On receiving data on this loopback port, sequence operator will
update HDHT and search for next transaction in order, which could be
stored in HDHT and will emit to next processing operator.

- Tushar.


On Sat, Aug 27, 2016 at 1:31 AM, Jim <ji...@facility.supplies> wrote:
> Good afternoon,
>
>
>
> I have an apex application where I may receive edi transactions, but
> sometimes they arrive out of order and I want to hold any out of sequence
> transactions till the correct time in the flow to process them.
>
>
>
> For example for a standard order, we will receive from the remote vendor:
>
>
>
> 1.)    General Acknowledgement
>
> 2.)    Detailed Acknowledgement
>
> 3.)    Ship Notification
>
> 4.)    Invoice
>
>
>
> They are supposed to be sent and received in that order.
>
>
>
> However sometimes vendors systems have problems, etc. so they send the all
> of these at the same time, and then we can receive them out of sequence.
> Data packets for these are very small, say from 1 to 512 bytes, and the only
> time they will be out of sequence, we will receive them very closely
> together.
>
>
>
> I am trying to think of the best way to do this in my datatorrent / Hadoop /
> yarn facilities, instead of creating a datatable in postgreSQl and using
> that.
>
>
>
> Can I create a flow that works like this (I am not sure if this makes sense,
> or is the best way to solve my problem, while keeping state, etc. maintained
> for all the operators):
>
>
>
> 1.)    In the inbound transaction router, check the hdht store for the order
> number, if it doesn’t exist, this means it is a new order, if the
> transaction trying to process is the general acknowledgment, emit the data
> to the general acknowledgement operator; if it is not – store the
> transaction data into the correct bucket identifying the transaction is it
> for, as well as the next step to be the general acknowledgement in HDHT by
> order number.
>
> 2.)    Say the next transaction is the ship notification, in the router, we
> would check the HDHT store, see this is not the next expected transaction
> (say it is supposed to be the detail acknowledgement), so we would just post
> the data for the ship notification into HDHT the store and say we are done.
>
> 3.)    Say we now receive the detailed acknowledgement for an order whose
> next step IS the detailed acknowledgement, we would see this is the correct
> next transaction, emit it to the detailed acknowledgement operator, and
> update the HDHT store to show that the next transaction should be the ship
> notification.  NOTE:  we can’t emit the ship notification yet, till we have
> confirmed that the detailed ackkowledgment has been completed.
>
> 4.)    In each of the 4 transaction operators at the end of the processing,
> we would update the HDHT store to show the next expected step, and if we
> already received data for the next expected step pull it from the HDHT
> store, and write the transaction into our SQS queue which is the input into
> the inbound transaction router at the beginning of the application, so it
> processes through the system.
>
>
>
> I believe HDHT can be used to pass data throughout an entire application,
> and is not limited to just a per operator basis, correct?
>
>
>
> Any comments / feedback?
>
>
>
> Thanks,
>
>
>
> Jim