You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@milagro.apache.org by sm...@apache.org on 2019/10/07 12:38:38 UTC

[incubator-milagro-dta] 02/02: Refactor Tendermint package

This is an automated email from the ASF dual-hosted git repository.

smihaylov pushed a commit to branch tendermint
in repository https://gitbox.apache.org/repos/asf/incubator-milagro-dta.git

commit 4f81f8ccf9e65e10b1d31570f5748d29032039fa
Author: Stanislav Mihaylov <sm...@gmail.com>
AuthorDate: Mon Oct 7 15:38:09 2019 +0300

    Refactor Tendermint package
---
 cmd/service/commands.go           |   1 +
 cmd/service/main.go               |  63 ++++++++--
 cmd/servicetester/tendertest1.sh  |   4 +-
 go.mod                            |   2 -
 go.sum                            |   5 +-
 libs/documents/docs.go            |   2 +-
 pkg/api/proto.go                  |  10 ++
 pkg/common/chain.go               |  14 +--
 pkg/common/common.go              |   5 +-
 pkg/defaultservice/fulfillTX.go   |  34 ++++--
 pkg/defaultservice/init.go        |  14 ++-
 pkg/defaultservice/order.go       |  35 ++++--
 pkg/defaultservice/orderTX.go     |  45 +++++--
 pkg/defaultservice/service.go     |  13 +-
 pkg/tendermint/cmd/cmd            | Bin 10230244 -> 0 bytes
 pkg/tendermint/config.go          |   6 -
 pkg/tendermint/connector.go       | 245 ++++++++++++++++++++++++++++++++++++++
 pkg/tendermint/tendermint.go      | 165 -------------------------
 pkg/tendermint/tendermint_test.go |  60 ----------
 pkg/tendermint/websockets.go      | 204 -------------------------------
 20 files changed, 424 insertions(+), 503 deletions(-)

diff --git a/cmd/service/commands.go b/cmd/service/commands.go
index d3d16bd..20a9300 100644
--- a/cmd/service/commands.go
+++ b/cmd/service/commands.go
@@ -78,6 +78,7 @@ func parseConfig(args []string) (*config.Config, error) {
 
 	fs := flag.NewFlagSet("daemon", flag.ExitOnError)
 	fs.StringVar(&(cfg.Plugins.Service), "service", cfg.Plugins.Service, "Service plugin")
+	fs.StringVar(&(cfg.Log.Level), "log-level", cfg.Log.Level, "Log level")
 
 	if err := fs.Parse(args); err != nil {
 		return nil, err
diff --git a/cmd/service/main.go b/cmd/service/main.go
index 41bb4a3..9c77efe 100644
--- a/cmd/service/main.go
+++ b/cmd/service/main.go
@@ -21,6 +21,7 @@ Package main - handles config, initialisation and starts the service daemon
 package main
 
 import (
+	"context"
 	"crypto/rand"
 	"fmt"
 	"net/http"
@@ -36,6 +37,7 @@ import (
 	"github.com/apache/incubator-milagro-dta/libs/ipfs"
 	"github.com/apache/incubator-milagro-dta/libs/logger"
 	"github.com/apache/incubator-milagro-dta/libs/transport"
+	"github.com/apache/incubator-milagro-dta/pkg/api"
 	"github.com/apache/incubator-milagro-dta/pkg/config"
 	"github.com/apache/incubator-milagro-dta/pkg/defaultservice"
 	"github.com/apache/incubator-milagro-dta/pkg/endpoints"
@@ -138,11 +140,12 @@ func startDaemon(args []string) error {
 		cfg.Log.Format,
 		cfg.Log.Level,
 	)
-
 	if err != nil {
 		return errors.Wrap(err, "init logger")
 	}
 
+	logger.Debug("Logger in DEBUG mode!")
+
 	// Create KV store
 	logger.Info("Datastore type: %s", cfg.Node.Datastore)
 	store, err := initDataStore(cfg.Node.Datastore)
@@ -187,6 +190,12 @@ func startDaemon(args []string) error {
 		}
 	}
 
+	// Init Tendermint node connector
+	tmConnector, err := tendermint.NewNodeConnector(cfg.Blockchain.BroadcastNode, cfg.Node.NodeID, store, logger)
+	if err != nil {
+		return errors.Wrap(err, "Blockchain Node connector")
+	}
+
 	//The Server must have a valid ID before starting up
 	svcPlugin := plugins.FindServicePlugin(cfg.Plugins.Service)
 	if svcPlugin == nil {
@@ -200,7 +209,7 @@ func startDaemon(args []string) error {
 		defaultservice.WithDataStore(store),
 		defaultservice.WithKeyStore(keyStore),
 		defaultservice.WithIPFS(ipfsConnector),
-		defaultservice.WithMasterFiduciary(masterFiduciaryServer),
+		defaultservice.WithTendermint(tmConnector),
 		defaultservice.WithConfig(cfg),
 	); err != nil {
 		return errors.Errorf("init service plugin %s", cfg.Plugins.Service)
@@ -223,23 +232,47 @@ func startDaemon(args []string) error {
 	}, []string{"method", "success"})
 
 	// Stop chan
+	ctx, cancelContext := context.WithCancel(context.Background())
+
 	errChan := make(chan error)
 
-	logger.Info("NODE ID (IPFS):  %v", svcPlugin.NodeID())
 	logger.Info("Node Type: %v", strings.ToLower(cfg.Node.NodeType))
-	endpoints := endpoints.Endpoints(svcPlugin, cfg.HTTP.CorsAllow, authorizer, logger, cfg.Node.NodeType, svcPlugin)
-	httpHandler := transport.NewHTTPHandler(endpoints, logger, duration)
+	logger.Info("Node ID:  %v", svcPlugin.NodeID())
+	logger.Info("Master Fiduciary: %v", svcPlugin.MasterFiduciaryNodeID())
 
 	//Connect to Blockchain - Tendermint
-	go tendermint.Subscribe(svcPlugin, store, logger, cfg.Node.NodeID, cfg.HTTP.ListenAddr)
-	if err != nil {
-		return errors.Wrap(err, "init Tendermint Blockchain")
-	}
+	go func() {
+		processFn := func(tx *api.BlockChainTX) error {
+			switch tx.Processor {
+			case "none":
+				return nil
+			case "dump":
+				svcPlugin.Dump(tx)
+			case "v1/fulfill/order":
+				svcPlugin.FulfillOrder(tx)
+			case "v1/order2":
+				svcPlugin.Order2(tx)
+			case "v1/fulfill/order/secret":
+				svcPlugin.FulfillOrderSecret(tx)
+			case "v1/order/secret2":
+				svcPlugin.OrderSecret2(tx)
+			default:
+				return errors.New("Unknown processor")
+			}
+			return nil
+		}
+
+		logger.Info("Starting Blockchain listener to node: %v", cfg.Blockchain.BroadcastNode)
+		errChan <- tmConnector.Subscribe(ctx, processFn)
+		// errChan <- tendermint.Subscribe(svcPlugin, store, logger, cfg.Node.NodeID, cfg.HTTP.ListenAddr)
+	}()
 
 	// Start the application http server
 	go func() {
-		logger.Info("starting listener on %v, custody server %v", cfg.HTTP.ListenAddr, cfg.Node.MasterFiduciaryServer)
-		// httpHandler.PathPrefix("/api/").Handler(http.St:ripPrefix("/api/", http.FileServer(http.Dir("./swagger"))))
+		httpEndpoints := endpoints.Endpoints(svcPlugin, cfg.HTTP.CorsAllow, authorizer, logger, cfg.Node.NodeType, svcPlugin)
+		httpHandler := transport.NewHTTPHandler(httpEndpoints, logger, duration)
+
+		logger.Info("Starting HTTP listener on %v", cfg.HTTP.ListenAddr)
 		errChan <- http.ListenAndServe(cfg.HTTP.ListenAddr, httpHandler)
 	}()
 
@@ -247,7 +280,7 @@ func startDaemon(args []string) error {
 		http.DefaultServeMux.Handle("/metrics", promhttp.Handler())
 		// Start the debug and metrics http server
 		go func() {
-			logger.Info("starting metrics listener on %v", cfg.HTTP.MetricsAddr)
+			logger.Info("Starting metrics listener on %v", cfg.HTTP.MetricsAddr)
 			errChan <- http.ListenAndServe(cfg.HTTP.MetricsAddr, http.DefaultServeMux)
 		}()
 	}
@@ -260,7 +293,11 @@ func startDaemon(args []string) error {
 	}()
 
 	stopErr := <-errChan
-	_ = logger.Log("exit", stopErr.Error())
+	if stopErr != nil {
+		_ = logger.Log("exit", stopErr.Error())
+	}
+
+	cancelContext()
 	return store.Close()
 }
 
diff --git a/cmd/servicetester/tendertest1.sh b/cmd/servicetester/tendertest1.sh
index 64fc139..7f91702 100755
--- a/cmd/servicetester/tendertest1.sh
+++ b/cmd/servicetester/tendertest1.sh
@@ -1,8 +1,8 @@
-ref=$(curl -s -X POST "127.0.0.1:5556/v1/order1" -H "accept: */*" -H "Content-Type: application/json" -d "{\"beneficiaryIDDocumentCID\":\"\",\"extension\":{\"coin\":\"0\"}}")
+ref=$(curl -s -X POST "127.0.0.1:5556/v1/order1" -H "accept: */*" -H "Content-Type: application/json" -d "{\"beneficiaryIDDocumentCID\":\"QmecffRZTSJDETCRLcjcPe7ynmYNyYpnh7WKzKTdmX1GBZ\",\"extension\":{\"coin\":\"0\"}}")
 
 #sleep long enough for blockchain to catch up
 sleep 4
 
-curl -X POST "127.0.0.1:5556/v1/order/secret1" -H "accept: */*" -H "Content-Type: application/json" -d "{\"orderReference\":$ref,\"beneficiaryIDDocumentCID\":\"QmcyJqEMqNEEYHrNSyUY83CQCNwZ5yVan3SgaQ4NchsqsC\"}"
+curl -X POST "127.0.0.1:5556/v1/order/secret1" -H "accept: */*" -H "Content-Type: application/json" -d "{\"orderReference\":$ref,\"beneficiaryIDDocumentCID\":\"QmecffRZTSJDETCRLcjcPe7ynmYNyYpnh7WKzKTdmX1GBZ\"}"
 
 
diff --git a/go.mod b/go.mod
index 7e0150c..860eb9c 100644
--- a/go.mod
+++ b/go.mod
@@ -2,8 +2,6 @@ module github.com/apache/incubator-milagro-dta
 
 require (
 	github.com/TylerBrock/colorjson v0.0.0-20180527164720-95ec53f28296
-	github.com/btcsuite/btcd v0.0.0-20190427004231-96897255fd17
-	github.com/VividCortex/gohistogram v1.0.0 // indirect
 	github.com/btcsuite/btcd v0.0.0-20190824003749-130ea5bddde3
 	github.com/btcsuite/btcutil v0.0.0-20190425235716-9e5f4b9a998d
 	github.com/coreos/go-oidc v2.0.0+incompatible
diff --git a/go.sum b/go.sum
index 1ee5995..fd46e0f 100644
--- a/go.sum
+++ b/go.sum
@@ -16,8 +16,8 @@ github.com/TylerBrock/colorjson v0.0.0-20180527164720-95ec53f28296 h1:JYWTroLXcN
 github.com/TylerBrock/colorjson v0.0.0-20180527164720-95ec53f28296/go.mod h1:VSw57q4QFiWDbRnjdX8Cb3Ow0SFncRw+bA/ofY6Q83w=
 github.com/VividCortex/gohistogram v1.0.0 h1:6+hBz+qvs0JOrrNhhmR7lFxo5sINxBCGXrdtl/UvroE=
 github.com/VividCortex/gohistogram v1.0.0/go.mod h1:Pf5mBqqDxYaXu3hDrrU+w6nw50o/4+TcAqDqk/vUH7g=
-github.com/Workiva/go-datastructures v1.0.50/go.mod h1:Z+F2Rca0qCsVYDS8z7bAGm8f3UkzuWYS/oBZz5a7VVA=
 github.com/VividCortex/gohistogram v1.0.0/go.mod h1:Pf5mBqqDxYaXu3hDrrU+w6nw50o/4+TcAqDqk/vUH7g=
+github.com/Workiva/go-datastructures v1.0.50/go.mod h1:Z+F2Rca0qCsVYDS8z7bAGm8f3UkzuWYS/oBZz5a7VVA=
 github.com/aead/siphash v1.0.1 h1:FwHfE/T45KPKYuuSAKyyvE+oPWcaQ+CUmFW0bPlM+kg=
 github.com/aead/siphash v1.0.1/go.mod h1:Nywa3cDsYNNK3gaciGTWPwHt0wlpNV15vwmswBAUSII=
 github.com/alecthomas/template v0.0.0-20160405071501-a0175ee3bccc/go.mod h1:LOuyumcjzFXgccqObfd/Ljyb9UuFJ6TxHnclSeseNhc=
@@ -105,9 +105,9 @@ github.com/fortytw2/leaktest v1.3.0 h1:u8491cBMTQ8ft8aeV+adlcytMZylmA5nnwwkRZjI8
 github.com/fortytw2/leaktest v1.3.0/go.mod h1:jDsjWgpAGjm2CA7WthBh/CdZYEPF31XHquHwclZch5g=
 github.com/fsnotify/fsnotify v1.4.7 h1:IXs+QLmnXW2CcXuY+8Mzv/fWEsPGWxqefPtCP5CnV9I=
 github.com/fsnotify/fsnotify v1.4.7/go.mod h1:jwhsz4b93w/PPRr/qN1Yymfu8t87LnFCMoQvtojpjFo=
+github.com/fsnotify/fsnotify v1.4.7/go.mod h1:jwhsz4b93w/PPRr/qN1Yymfu8t87LnFCMoQvtojpjFo=
 github.com/ghodss/yaml v1.0.0/go.mod h1:4dBDuWmgqj2HViK6kFavaiC9ZROes6MMH2rRYeMEF04=
 github.com/go-check/check v0.0.0-20180628173108-788fd7840127 h1:0gkP6mzaMqkmpcJYCFOLkIBwI7xFExG03bbkOkCvUPI=
-github.com/fsnotify/fsnotify v1.4.7/go.mod h1:jwhsz4b93w/PPRr/qN1Yymfu8t87LnFCMoQvtojpjFo=
 github.com/go-check/check v0.0.0-20180628173108-788fd7840127/go.mod h1:9ES+weclKsC9YodN5RgxqK/VD9HM9JsCSh7rNhMZE98=
 github.com/go-critic/go-critic v0.3.5-0.20190526074819-1df300866540/go.mod h1:+sE8vrLDS2M0pZkBk0wy6+nLdKexVDrl/jBqQOTDThA=
 github.com/go-kit/kit v0.8.0 h1:Wz+5lgoB0kkuqLEc6NVmwRknTKP6dTGbSqvhZtBI/j0=
@@ -689,6 +689,7 @@ github.com/tendermint/go-amino v0.14.1 h1:o2WudxNfdLNBwMyl2dqOJxiro5rfrEaU0Ugs6o
 github.com/tendermint/go-amino v0.14.1/go.mod h1:i/UKE5Uocn+argJJBb12qTZsCDBcAYMbR92AaJVmKso=
 github.com/tendermint/tendermint v0.32.4 h1:KwZIMtT+ROvfMYO3wine6F9hak3SpngcRcAIzys1J3I=
 github.com/tendermint/tendermint v0.32.4/go.mod h1:D2+A3pNjY+Po72X0mTfaXorFhiVI8dh/Zg640FGyGtE=
+github.com/tendermint/tendermint v0.32.5 h1:2hCLwuzfCKZxXSe/+iMEl+ChJWKJx6g/Wcvq3NMxVN4=
 github.com/tendermint/tm-db v0.2.0 h1:rJxgdqn6fIiVJZy4zLpY1qVlyD0TU6vhkT4kEf71TQQ=
 github.com/tendermint/tm-db v0.2.0/go.mod h1:0cPKWu2Mou3IlxecH+MEUSYc1Ch537alLe6CpFrKzgw=
 github.com/texttheater/golang-levenshtein v0.0.0-20180516184445-d188e65d659e/go.mod h1:XDKHRm5ThF8YJjx001LtgelzsoaEcvnA7lVWz9EeX3g=
diff --git a/libs/documents/docs.go b/libs/documents/docs.go
index 7e9fe27..59bad92 100644
--- a/libs/documents/docs.go
+++ b/libs/documents/docs.go
@@ -93,7 +93,7 @@ func DecodeIDDocument(rawdoc []byte, tag string, idDocument *IDDoc) error {
 	return nil
 }
 
-//PeekOrderDocument - look at the header inside an order document before decryption
+//OrderDocumentSigner - look at the header inside an order document before decryption
 func OrderDocumentSigner(rawDoc []byte) (string, error) {
 	signedEnvelope := SignedEnvelope{}
 	err := proto.Unmarshal(rawDoc, &signedEnvelope)
diff --git a/pkg/api/proto.go b/pkg/api/proto.go
index aedcd2c..e3200d7 100644
--- a/pkg/api/proto.go
+++ b/pkg/api/proto.go
@@ -24,6 +24,8 @@ package api
 */
 
 import (
+	"crypto/sha256"
+	"encoding/hex"
 	"time"
 )
 
@@ -51,6 +53,14 @@ type BlockChainTX struct {
 	Tags                   map[string]string
 }
 
+// CalcHash calculates, sets the TXhash and returns the string representation
+func (tx *BlockChainTX) CalcHash() string {
+	txSha := sha256.Sum256(tx.Payload)
+	tx.TXhash = txSha[:]
+	return hex.EncodeToString(txSha[:])
+
+}
+
 //CreateIdentityRequest -
 type CreateIdentityRequest struct {
 	Name      string            `json:"name,omitempty" validate:"required,alphanum"`
diff --git a/pkg/common/chain.go b/pkg/common/chain.go
index d7a7023..a0438b5 100644
--- a/pkg/common/chain.go
+++ b/pkg/common/chain.go
@@ -10,16 +10,8 @@ import (
 )
 
 // CreateTX creates the transaction ready for write to the chain
-func CreateTX(nodeID string, store *datastore.Store, id string, order *documents.OrderDoc, recipients map[string]documents.IDDoc) ([]byte, []byte, error) {
-	secrets := &IdentitySecrets{}
-	if err := store.Get("id-doc", nodeID, secrets); err != nil {
-		return nil, nil, errors.New("load secrets from store")
-	}
-	blsSecretKey, err := hex.DecodeString(secrets.BLSSecretKey)
-	if err != nil {
-		return nil, nil, errors.Wrap(err, "Decode identity secrets")
-	}
-	rawDoc, err := documents.EncodeOrderDocument(nodeID, *order, blsSecretKey, "previousID", recipients)
+func CreateTX(nodeID string, store *datastore.Store, blsSecretKey []byte, id string, order *documents.OrderDoc, recipients map[string]*documents.IDDoc) ([]byte, []byte, error) {
+	rawDoc, err := documents.EncodeOrderDocument(nodeID, *order, blsSecretKey, recipients)
 	if err != nil {
 		return nil, nil, errors.Wrap(err, "Failed to encode IDDocument")
 	}
@@ -32,7 +24,7 @@ func CreateTX(nodeID string, store *datastore.Store, id string, order *documents
 	return TXID[:], rawDoc, nil
 }
 
-//Decode a transaction for header data but don't decrypt it
+//PeekTX Decode a transaction for header data but don't decrypt it
 func PeekTX(tx []byte) (string, error) {
 	signerCID, err := documents.OrderDocumentSigner(tx)
 	print(signerCID)
diff --git a/pkg/common/common.go b/pkg/common/common.go
index 4e8220b..cdca4fe 100644
--- a/pkg/common/common.go
+++ b/pkg/common/common.go
@@ -105,6 +105,7 @@ func RetrieveSeed(store *datastore.Store, reference string) (seedHex string, err
 	return seedHex, nil
 }
 
+//WriteOrderToStore stores an order
 func WriteOrderToStore(store *datastore.Store, orderReference string, address string) error {
 	if err := store.Set("order", orderReference, address, map[string]string{"time": time.Now().UTC().Format(time.RFC3339)}); err != nil {
 		return errors.New("Save Order to store")
@@ -113,8 +114,8 @@ func WriteOrderToStore(store *datastore.Store, orderReference string, address st
 }
 
 // BuildRecipientList builds a list of recipients who are able to decrypt the encrypted envelope
-func BuildRecipientList(ipfs ipfs.Connector, IDDocs ...string) (map[string]documents.IDDoc, error) {
-	recipients := make(map[string]documents.IDDoc)
+func BuildRecipientList(ipfs ipfs.Connector, IDDocs ...string) (map[string]*documents.IDDoc, error) {
+	recipients := make(map[string]*documents.IDDoc)
 	for _, v := range IDDocs {
 		iddoc, err := RetrieveIDDocFromIPFS(ipfs, v)
 		if err != nil {
diff --git a/pkg/defaultservice/fulfillTX.go b/pkg/defaultservice/fulfillTX.go
index b0c1025..b0527a9 100644
--- a/pkg/defaultservice/fulfillTX.go
+++ b/pkg/defaultservice/fulfillTX.go
@@ -25,9 +25,10 @@ import (
 	"github.com/apache/incubator-milagro-dta/libs/documents"
 	"github.com/apache/incubator-milagro-dta/pkg/api"
 	"github.com/apache/incubator-milagro-dta/pkg/common"
-	"github.com/apache/incubator-milagro-dta/pkg/tendermint"
+	"github.com/apache/incubator-milagro-dta/pkg/identity"
 )
 
+// FulfillOrder TX
 func (s *Service) FulfillOrder(tx *api.BlockChainTX) (string, error) {
 	nodeID := s.NodeID()
 	reqPayload := tx.Payload
@@ -40,7 +41,16 @@ func (s *Service) FulfillOrder(tx *api.BlockChainTX) (string, error) {
 	}
 	remoteIDDocCID := signerID
 
-	_, _, _, sikeSK, err := common.RetrieveIdentitySecrets(s.Store, nodeID)
+	// SIKE and BLS keys
+	keyseed, err := s.KeyStore.Get("seed")
+	if err != nil {
+		return "", err
+	}
+	_, sikeSK, err := identity.GenerateSIKEKeys(keyseed)
+	if err != nil {
+		return "", err
+	}
+	_, blsSK, err := identity.GenerateBLSKeys(keyseed)
 	if err != nil {
 		return "", err
 	}
@@ -83,7 +93,7 @@ func (s *Service) FulfillOrder(tx *api.BlockChainTX) (string, error) {
 	}
 
 	//Create a new Transaction payload and TX
-	txHash, payload, err := common.CreateTX(nodeID, s.Store, nodeID, order, recipientList)
+	txHash, payload, err := common.CreateTX(nodeID, s.Store, blsSK, nodeID, order, recipientList)
 
 	//Write the requests to the chain
 	chainTX := &api.BlockChainTX{
@@ -95,8 +105,8 @@ func (s *Service) FulfillOrder(tx *api.BlockChainTX) (string, error) {
 		TXhash:                 txHash,
 		Tags:                   map[string]string{"reference": order.Reference, "txhash": hex.EncodeToString(txHash)},
 	}
-	return tendermint.PostToChain(chainTX, "FulfillOrder")
 
+	return s.Tendermint.PostTx(chainTX, "FulfillOrder")
 }
 
 // FulfillOrderSecret -
@@ -112,7 +122,16 @@ func (s *Service) FulfillOrderSecret(tx *api.BlockChainTX) (string, error) {
 	}
 	remoteIDDocCID := signerID
 
-	_, _, _, sikeSK, err := common.RetrieveIdentitySecrets(s.Store, nodeID)
+	// SIKE and BLS keys
+	keyseed, err := s.KeyStore.Get("seed")
+	if err != nil {
+		return "", err
+	}
+	_, sikeSK, err := identity.GenerateSIKEKeys(keyseed)
+	if err != nil {
+		return "", err
+	}
+	_, blsSK, err := identity.GenerateBLSKeys(keyseed)
 	if err != nil {
 		return "", err
 	}
@@ -155,7 +174,7 @@ func (s *Service) FulfillOrderSecret(tx *api.BlockChainTX) (string, error) {
 	}
 
 	//Create a new Transaction payload and TX
-	txHash, payload, err := common.CreateTX(nodeID, s.Store, nodeID, order, recipientList)
+	txHash, payload, err := common.CreateTX(nodeID, s.Store, blsSK, nodeID, order, recipientList)
 
 	//Write the requests to the chain
 	chainTX := &api.BlockChainTX{
@@ -167,5 +186,6 @@ func (s *Service) FulfillOrderSecret(tx *api.BlockChainTX) (string, error) {
 		Payload: payload,
 		Tags:    map[string]string{"reference": order.Reference, "txhash": hex.EncodeToString(txHash)},
 	}
-	return tendermint.PostToChain(chainTX, "FulfillOrderSecret")
+
+	return s.Tendermint.PostTx(chainTX, "FulfillOrderSecret")
 }
diff --git a/pkg/defaultservice/init.go b/pkg/defaultservice/init.go
index f0c0190..e4874ae 100644
--- a/pkg/defaultservice/init.go
+++ b/pkg/defaultservice/init.go
@@ -24,8 +24,8 @@ import (
 	"github.com/apache/incubator-milagro-dta/libs/ipfs"
 	"github.com/apache/incubator-milagro-dta/libs/keystore"
 	"github.com/apache/incubator-milagro-dta/libs/logger"
-	"github.com/apache/incubator-milagro-dta/pkg/api"
 	"github.com/apache/incubator-milagro-dta/pkg/config"
+	"github.com/apache/incubator-milagro-dta/pkg/tendermint"
 )
 
 // ServiceOption function to set Service properties
@@ -84,10 +84,18 @@ func WithIPFS(ipfsConnector ipfs.Connector) ServiceOption {
 	}
 }
 
+// WithTendermint adds tendermint node connector to the Service
+func WithTendermint(tmConnector *tendermint.NodeConnector) ServiceOption {
+	return func(s *Service) error {
+		s.Tendermint = tmConnector
+		return nil
+	}
+}
+
 // WithMasterFiduciary adds master fiduciary connector to the Service
-func WithMasterFiduciary(masterFiduciaryServer api.ClientService) ServiceOption {
+func WithMasterFiduciary(masterFiduciaryNodeID string) ServiceOption {
 	return func(s *Service) error {
-		s.MasterFiduciaryServer = masterFiduciaryServer
+		s.SetMasterFiduciaryNodeID(masterFiduciaryNodeID)
 		return nil
 	}
 }
diff --git a/pkg/defaultservice/order.go b/pkg/defaultservice/order.go
index acc6a6d..3f60158 100644
--- a/pkg/defaultservice/order.go
+++ b/pkg/defaultservice/order.go
@@ -27,7 +27,6 @@ import (
 	"github.com/apache/incubator-milagro-dta/pkg/api"
 	"github.com/apache/incubator-milagro-dta/pkg/common"
 	"github.com/apache/incubator-milagro-dta/pkg/identity"
-	"github.com/apache/incubator-milagro-dta/pkg/tendermint"
 	"github.com/pkg/errors"
 )
 
@@ -111,8 +110,6 @@ func (s *Service) PrepareOrderPart1(order *documents.OrderDoc, reqExtension map[
 }
 
 // PrepareOrderResponse gets the updated order and returns the commitment and extension
-//func (s *Service) PrepareOrderResponse(orderPart2 *documents.OrderDoc, reqExtension, fulfillExtension map[string]string) (commitment string, extension map[string]string, err error) {
-
 func (s *Service) PrepareOrderResponse(orderPart2 *documents.OrderDoc) (commitment string, extension map[string]string, err error) {
 	return orderPart2.OrderPart2.CommitmentPublicKey, nil, nil
 }
@@ -158,8 +155,18 @@ func (s *Service) Order1(req *api.OrderRequest) (string, error) {
 		order.OrderReqExtension[key] = value
 	}
 
+	// BLS key
+	keyseed, err := s.KeyStore.Get("seed")
+	if err != nil {
+		return "", err
+	}
+	_, blsSK, err := identity.GenerateBLSKeys(keyseed)
+	if err != nil {
+		return "", err
+	}
+
 	//This is serialized and output to the chain
-	txHash, payload, err := common.CreateTX(nodeID, s.Store, nodeID, order, recipientList)
+	txHash, payload, err := common.CreateTX(nodeID, s.Store, blsSK, nodeID, order, recipientList)
 
 	//Write the requests to the chain
 	chainTX := &api.BlockChainTX{
@@ -171,7 +178,11 @@ func (s *Service) Order1(req *api.OrderRequest) (string, error) {
 		TXhash:                 txHash,
 		Tags:                   map[string]string{"reference": order.Reference, "txhash": hex.EncodeToString(txHash)},
 	}
-	tendermint.PostToChain(chainTX, "Order1")
+
+	if _, err := s.Tendermint.PostTx(chainTX, "Order1"); err != nil {
+		return "", err
+	}
+
 	return order.Reference, nil
 }
 
@@ -196,11 +207,11 @@ func (s *Service) OrderSecret1(req *api.OrderSecretRequest) (string, error) {
 	// SIKE and BLS keys
 	keyseed, err := s.KeyStore.Get("seed")
 	if err != nil {
-		return nil, err
+		return "", err
 	}
 	_, sikeSK, err := identity.GenerateSIKEKeys(keyseed)
 	if err != nil {
-		return nil, err
+		return "", err
 	}
 	_, blsSK, err := identity.GenerateBLSKeys(keyseed)
 	if err != nil {
@@ -212,7 +223,7 @@ func (s *Service) OrderSecret1(req *api.OrderSecretRequest) (string, error) {
 		return "", err
 	}
 
-	tx, err := tendermint.TXbyHash(previousOrderHash)
+	tx, err := s.Tendermint.GetTx(previousOrderHash)
 	if err != nil {
 		return "", err
 	}
@@ -258,7 +269,7 @@ func (s *Service) OrderSecret1(req *api.OrderSecretRequest) (string, error) {
 		Timestamp:                time.Now().Unix(),
 	}
 
-	txHash, payload, err := common.CreateTX(nodeID, s.Store, nodeID, order, recipientList)
+	txHash, payload, err := common.CreateTX(nodeID, s.Store, blsSK, nodeID, order, recipientList)
 
 	//Write the requests to the chain
 	chainTX := &api.BlockChainTX{
@@ -269,6 +280,10 @@ func (s *Service) OrderSecret1(req *api.OrderSecretRequest) (string, error) {
 		Payload:                payload,
 		Tags:                   map[string]string{"reference": order.Reference, "txhash": hex.EncodeToString(txHash)},
 	}
-	tendermint.PostToChain(chainTX, "OrderSecret1")
+
+	if _, err := s.Tendermint.PostTx(chainTX, "OrderSecret1"); err != nil {
+		return "", err
+	}
+
 	return order.Reference, nil
 }
diff --git a/pkg/defaultservice/orderTX.go b/pkg/defaultservice/orderTX.go
index 4fe62f4..d6188c9 100644
--- a/pkg/defaultservice/orderTX.go
+++ b/pkg/defaultservice/orderTX.go
@@ -19,11 +19,12 @@ package defaultservice
 
 import (
 	"encoding/hex"
+	"fmt"
 
 	"github.com/apache/incubator-milagro-dta/libs/documents"
 	"github.com/apache/incubator-milagro-dta/pkg/api"
 	"github.com/apache/incubator-milagro-dta/pkg/common"
-	"github.com/apache/incubator-milagro-dta/pkg/tendermint"
+	"github.com/apache/incubator-milagro-dta/pkg/identity"
 	"github.com/pkg/errors"
 )
 
@@ -38,7 +39,16 @@ func (s *Service) Order2(tx *api.BlockChainTX) (string, error) {
 		return "", err
 	}
 
-	_, _, _, sikeSK, err := common.RetrieveIdentitySecrets(s.Store, nodeID)
+	// SIKE and BLS keys
+	keyseed, err := s.KeyStore.Get("seed")
+	if err != nil {
+		return "", err
+	}
+	_, sikeSK, err := identity.GenerateSIKEKeys(keyseed)
+	if err != nil {
+		return "", err
+	}
+	_, blsSK, err := identity.GenerateBLSKeys(keyseed)
 	if err != nil {
 		return "", err
 	}
@@ -73,7 +83,7 @@ func (s *Service) Order2(tx *api.BlockChainTX) (string, error) {
 	}
 
 	//Generate a transaction
-	txHash, payload, err := common.CreateTX(nodeID, s.Store, nodeID, order, recipientList)
+	txHash, payload, err := common.CreateTX(nodeID, s.Store, blsSK, nodeID, order, recipientList)
 
 	//Write the Order2 results to the chain
 	chainTX := &api.BlockChainTX{
@@ -84,8 +94,8 @@ func (s *Service) Order2(tx *api.BlockChainTX) (string, error) {
 		Payload:                payload,
 		Tags:                   map[string]string{"reference": order.Reference, "txhash": hex.EncodeToString(txHash)},
 	}
-	return tendermint.PostToChain(chainTX, "Order2")
 
+	return s.Tendermint.PostTx(chainTX, "Order2")
 }
 
 // OrderSecret2 - Process an incoming Blockchain Order/Secret transaction from a MasterFiduciary, to generate the final secret
@@ -94,7 +104,16 @@ func (s *Service) OrderSecret2(tx *api.BlockChainTX) (string, error) {
 	reqPayload := tx.Payload
 	txHashString := hex.EncodeToString(tx.TXhash)
 
-	_, _, _, sikeSK, err := common.RetrieveIdentitySecrets(s.Store, nodeID)
+	// SIKE and BLS keys
+	keyseed, err := s.KeyStore.Get("seed")
+	if err != nil {
+		return "", err
+	}
+	_, sikeSK, err := identity.GenerateSIKEKeys(keyseed)
+	if err != nil {
+		return "", err
+	}
+	_, blsSK, err := identity.GenerateBLSKeys(keyseed)
 	if err != nil {
 		return "", err
 	}
@@ -107,17 +126,16 @@ func (s *Service) OrderSecret2(tx *api.BlockChainTX) (string, error) {
 	//Decode the Order from the supplied TX
 	order := &documents.OrderDoc{}
 	err = documents.DecodeOrderDocument(reqPayload, txHashString, order, sikeSK, nodeID, remoteIDDoc.BLSPublicKey)
+	if err != nil {
+		fmt.Println("ERROR DEcode Order:", err)
+		return "", err
+	}
 
 	if order.BeneficiaryCID != nodeID {
 		return "", errors.New("Invalid Processor")
 	}
 
-	_, seed, _, sikeSK, err := common.RetrieveIdentitySecrets(s.Store, nodeID)
-	if err != nil {
-		return "", err
-	}
-
-	finalPrivateKey, _, extension, err := s.Plugin.ProduceFinalSecret(seed, sikeSK, order, order, nodeID)
+	finalPrivateKey, _, extension, err := s.Plugin.ProduceFinalSecret(keyseed, sikeSK, order, order, nodeID)
 	if err != nil {
 		return "", err
 	}
@@ -140,7 +158,7 @@ func (s *Service) OrderSecret2(tx *api.BlockChainTX) (string, error) {
 	if err != nil {
 		return "", err
 	}
-	txHash, payload, err := common.CreateTX(nodeID, s.Store, nodeID, order, recipientList)
+	txHash, payload, err := common.CreateTX(nodeID, s.Store, blsSK, nodeID, order, recipientList)
 
 	//Write the requests to the chain
 	chainTX := &api.BlockChainTX{
@@ -151,5 +169,6 @@ func (s *Service) OrderSecret2(tx *api.BlockChainTX) (string, error) {
 		Payload:                payload,
 		Tags:                   map[string]string{"reference": order.Reference, "txhash": hex.EncodeToString(txHash)},
 	}
-	return tendermint.PostToChain(chainTX, "OrderSecret2")
+
+	return s.Tendermint.PostTx(chainTX, "OrderSecret2")
 }
diff --git a/pkg/defaultservice/service.go b/pkg/defaultservice/service.go
index 9299b92..2b9fe0d 100644
--- a/pkg/defaultservice/service.go
+++ b/pkg/defaultservice/service.go
@@ -34,7 +34,8 @@ import (
 	"github.com/apache/incubator-milagro-dta/libs/transport"
 	"github.com/apache/incubator-milagro-dta/pkg/api"
 	"github.com/apache/incubator-milagro-dta/pkg/common"
-	"github.com/apache/incubator-milagro-dta/pkg/config"
+	"github.com/apache/incubator-milagro-dta/pkg/identity"
+	"github.com/apache/incubator-milagro-dta/pkg/tendermint"
 	"github.com/hokaccha/go-prettyjson"
 )
 
@@ -52,6 +53,7 @@ type Service struct {
 	Store                 *datastore.Store
 	KeyStore              keystore.Store
 	Ipfs                  ipfs.Connector
+	Tendermint            *tendermint.NodeConnector
 	nodeID                string
 	masterFiduciaryNodeID string
 }
@@ -115,7 +117,12 @@ func (s *Service) Dump(tx *api.BlockChainTX) error {
 		return err
 	}
 
-	_, _, _, sikeSK, err := common.RetrieveIdentitySecrets(s.Store, nodeID)
+	// SIKE and BLS keys
+	keyseed, err := s.KeyStore.Get("seed")
+	if err != nil {
+		return err
+	}
+	_, sikeSK, err := identity.GenerateSIKEKeys(keyseed)
 	if err != nil {
 		return err
 	}
@@ -127,6 +134,8 @@ func (s *Service) Dump(tx *api.BlockChainTX) error {
 	fmt.Println(string(pp))
 
 	return nil
+}
+
 // Endpoints for extending the plugin endpoints
 func (s *Service) Endpoints() (namespace string, endpoints transport.HTTPEndpoints) {
 	return s.Name(), nil
diff --git a/pkg/tendermint/cmd/cmd b/pkg/tendermint/cmd/cmd
deleted file mode 100755
index 0e07c4f..0000000
Binary files a/pkg/tendermint/cmd/cmd and /dev/null differ
diff --git a/pkg/tendermint/config.go b/pkg/tendermint/config.go
deleted file mode 100644
index 510189d..0000000
--- a/pkg/tendermint/config.go
+++ /dev/null
@@ -1,6 +0,0 @@
-package tendermint
-
-const (
-	//node = "127.0.0.1:26657"
-	node = "34.246.173.153:26657"
-)
diff --git a/pkg/tendermint/connector.go b/pkg/tendermint/connector.go
new file mode 100644
index 0000000..0dab29e
--- /dev/null
+++ b/pkg/tendermint/connector.go
@@ -0,0 +1,245 @@
+package tendermint
+
+import (
+	"context"
+	"encoding/base64"
+	"encoding/json"
+	"fmt"
+	"io/ioutil"
+	"net/http"
+	"strconv"
+	"strings"
+	"time"
+
+	"github.com/apache/incubator-milagro-dta/libs/datastore"
+	"github.com/apache/incubator-milagro-dta/libs/logger"
+	"github.com/apache/incubator-milagro-dta/pkg/api"
+	status "github.com/apache/incubator-milagro-dta/pkg/tendermint/status"
+	"github.com/pkg/errors"
+	tmclient "github.com/tendermint/tendermint/rpc/client"
+	tmtypes "github.com/tendermint/tendermint/types"
+)
+
+const (
+	nodeConnectionTimeout = time.Second * 10
+	txChanSize            = 1000
+)
+
+// ProcessTXFunc is executed on each incoming TX
+type ProcessTXFunc func(tx *api.BlockChainTX) error
+
+// NodeConnector is using external tendermint node to post and get transactions
+type NodeConnector struct {
+	nodeID     string
+	tmNodeAddr string
+	httpClient *http.Client
+	tmClient   *tmclient.HTTP
+	log        *logger.Logger
+	store      *datastore.Store
+}
+
+// NewNodeConnector constructs a new Tendermint NodeConnector
+func NewNodeConnector(tmNodeAddr string, nodeID string, store *datastore.Store, log *logger.Logger) (conn *NodeConnector, err error) {
+	defer func() {
+		if r := recover(); r != nil {
+			err = errors.Errorf("Initialize tendermint node connector: %v", r)
+		}
+	}()
+
+	tmNodeAddr = strings.TrimRight(tmNodeAddr, "/")
+	tmClient := tmclient.NewHTTP(fmt.Sprintf("tcp://%s", tmNodeAddr), "/websocket")
+	if err := tmClient.Start(); err != nil {
+		return nil, errors.Wrap(err, "Start tendermint client")
+	}
+
+	return &NodeConnector{
+		tmNodeAddr: tmNodeAddr,
+		nodeID:     nodeID,
+		log:        log,
+		store:      store,
+		httpClient: &http.Client{
+			Timeout: nodeConnectionTimeout,
+		},
+		tmClient: tmClient,
+	}, nil
+
+}
+
+// Stop is performing clean-up
+func (nc *NodeConnector) Stop() error {
+	return nc.tmClient.Stop()
+}
+
+// GetTx retreives a transaction by hash
+func (nc *NodeConnector) GetTx(txHash string) (*api.BlockChainTX, error) {
+	query := fmt.Sprintf("tag.txhash='%s'", txHash)
+	result, err := nc.tmClient.TxSearch(query, true, 1, 1)
+	if err != nil {
+		return nil, err
+	}
+	if len(result.Txs) == 0 {
+		return nil, errors.New("Transaction not found")
+	}
+
+	payload := &api.BlockChainTX{}
+	if err := json.Unmarshal(result.Txs[0].Tx, &payload); err != nil {
+		return nil, err
+	}
+
+	return payload, nil
+}
+
+// PostTx posts a transaction to the chain and returns the transaction ID
+func (nc *NodeConnector) PostTx(tx *api.BlockChainTX, method string) (txID string, err error) {
+	txID = tx.CalcHash()
+
+	//serialize the whole transaction
+	serializedTX, err := json.Marshal(tx)
+	if err != nil {
+		return
+	}
+	base64EncodedTX := base64.StdEncoding.EncodeToString(serializedTX)
+
+	// TODO: use net/rpc
+	body := strings.NewReader(`{
+		"jsonrpc": "2.0",
+		"id": "anything",
+		"method": "broadcast_tx_commit",
+		"params": {
+			"tx": "` + base64EncodedTX + `"}
+	}`)
+	url := "http://" + nc.tmNodeAddr
+
+	req, err := http.NewRequest("POST", url, body)
+	if err != nil {
+		return "", errors.Wrap(err, "post to blockchain node")
+	}
+	req.Header.Set("Content-Type", "text/plain;")
+
+	resp, err := nc.httpClient.Do(req)
+	if err != nil {
+		return "", errors.Wrap(err, "post to blockchain node")
+	}
+	defer resp.Body.Close()
+
+	if resp.StatusCode != http.StatusOK {
+		var respErr string
+		if b, err := ioutil.ReadAll(resp.Body); err != nil {
+			respErr = resp.Status
+		} else {
+			respErr = string(b)
+		}
+
+		return "", errors.Errorf("Post to blockchain node status %v: %v", resp.StatusCode, respErr)
+	}
+
+	nc.log.Debug("POST TO CHAIN: METHOD: %s CALLS: %s  - TXID: %s", method, tx.Processor, txID)
+
+	return
+}
+
+// Subscribe connects to the Tendermint node and collect the events
+func (nc *NodeConnector) Subscribe(ctx context.Context, processFn ProcessTXFunc) error {
+	chainStatus, err := nc.getChainStatus()
+	if err != nil {
+		return err
+	}
+
+	currentBlockHeight, err := strconv.Atoi(chainStatus.Result.SyncInfo.LatestBlockHeight)
+	if err != nil {
+		return errors.Wrap(err, "Failed to obtain latest blockheight of Blockchain")
+	}
+
+	var processedToHeight int
+	if err := nc.store.Get("chain", "height", &processedToHeight); err != nil {
+		if err != datastore.ErrKeyNotFound {
+			return errors.Wrap(err, "Get last processed block height")
+		}
+	}
+
+	nc.log.Debug("Block height: Current: %v; Processed: %v", currentBlockHeight, processedToHeight)
+
+	// create the transaction queue chan
+	txQueue := make(chan *api.BlockChainTX, txChanSize)
+
+	// Collect events
+	if err := nc.subscribeAndQueue(ctx, txQueue); err != nil {
+		return err
+	}
+
+	// TODO: load historicTX
+
+	// Process events
+	return nc.processTXQueue(ctx, txQueue, processFn)
+}
+
+func (nc *NodeConnector) subscribeAndQueue(ctx context.Context, txQueue chan *api.BlockChainTX) error {
+	query := "tag.recipient='" + nc.nodeID + "'"
+
+	out, err := nc.tmClient.Subscribe(context.Background(), "test", query, 1000)
+	if err != nil {
+		return errors.Wrapf(err, "Failed to subscribe to query %s", query)
+	}
+
+	go func() {
+		for {
+			select {
+			case result := <-out:
+				tx := result.Data.(tmtypes.EventDataTx).Tx
+				payload := &api.BlockChainTX{}
+				err := json.Unmarshal(tx, payload)
+				if err != nil {
+					nc.log.Debug("IGNORED TX - Invalid!")
+					break
+				}
+
+				//check if this node is in receipient list
+				if payload.RecipientID != nc.nodeID {
+					nc.log.Debug("IGNORED TX! Recipient not match the query! (%v != %v)", payload.RecipientID, nc.nodeID)
+					break
+				}
+
+				//Add into the waitingQueue for later processing
+				txQueue <- payload
+			case <-ctx.Done():
+				return
+			}
+		}
+	}()
+
+	return nil
+}
+
+func (nc *NodeConnector) processTXQueue(ctx context.Context, txQueue chan *api.BlockChainTX, processFn ProcessTXFunc) error {
+	for {
+		select {
+		case tx := <-txQueue:
+			if err := processFn(tx); err != nil {
+				// TODO: errors block processing the queue
+				return err
+			}
+			// TODO: store the last block height
+		case <-ctx.Done():
+			return nil
+		}
+	}
+}
+
+func (nc *NodeConnector) getChainStatus() (*status.StatusResponse, error) {
+	url := fmt.Sprintf("http://%s/status", nc.tmNodeAddr)
+	resp, err := nc.httpClient.Get(url)
+	if err != nil {
+		return nil, errors.Wrap(err, "Get node status")
+	}
+	defer resp.Body.Close()
+	if resp.StatusCode != http.StatusOK {
+		return nil, errors.Errorf("Get node status status code: %v", resp.StatusCode)
+	}
+
+	status := &status.StatusResponse{}
+	if err := json.NewDecoder(resp.Body).Decode((&status)); err != nil {
+		return nil, errors.Wrap(err, "Invalid node status response")
+	}
+
+	return status, nil
+}
diff --git a/pkg/tendermint/tendermint.go b/pkg/tendermint/tendermint.go
deleted file mode 100644
index e02b3c1..0000000
--- a/pkg/tendermint/tendermint.go
+++ /dev/null
@@ -1,165 +0,0 @@
-package tendermint
-
-import (
-	"bufio"
-	"crypto/sha256"
-	"encoding/base64"
-	"encoding/hex"
-	"encoding/json"
-	"errors"
-	"fmt"
-	"net/http"
-	"strings"
-
-	"github.com/apache/incubator-milagro-dta/pkg/api"
-	"github.com/apache/incubator-milagro-dta/pkg/service"
-)
-
-//QueryChain the blockchain for an index
-func QueryChain(index string) (string, string) {
-	url := "http://" + node + "/abci_query?data=\"" + index + "\""
-	resp, err := http.Get(url)
-	if err != nil {
-		// handle err
-	}
-	defer resp.Body.Close()
-	scanner := bufio.NewScanner(resp.Body)
-	scanner.Split(bufio.ScanBytes)
-	t := ""
-	for scanner.Scan() {
-		t += scanner.Text()
-		///fmt.Print(scanner.Text())
-	}
-
-	res, _ := UnmarshalChainQuery([]byte(t))
-
-	val := res.Result.Response.Value
-	decodeVal, _ := base64.StdEncoding.DecodeString(val)
-	return string(decodeVal), val
-}
-
-//PostToChain - send TX data to the Blockchain
-func PostToChain(tx *api.BlockChainTX, method string) (string, error) {
-	//Create TX Hash
-
-	tx.RecipientID = tx.RecipientID
-
-	TXID := sha256.Sum256(tx.Payload)
-	TXIDhex := hex.EncodeToString(TXID[:])
-	tx.TXhash = TXID[:]
-
-	//serialize the whole transaction
-	serializedTX, _ := json.Marshal(tx)
-	base64EncodedTX := base64.StdEncoding.EncodeToString(serializedTX)
-
-	body := strings.NewReader("{\"jsonrpc\":\"2.0\",\"id\":\"anything\",\"method\":\"broadcast_tx_commit\",\"params\": {\"tx\": \"" + base64EncodedTX + "\"}}")
-	url := "http://" + node + ""
-
-	req, err := http.NewRequest("POST", url, body)
-	if err != nil {
-		print("Error posting to Blockchain")
-		return "", err
-	}
-	req.Header.Set("Content-Type", "text/plain;")
-
-	resp, err := http.DefaultClient.Do(req)
-	if err != nil {
-		print("Error posting to Blockchain")
-		return "", err
-	}
-	defer resp.Body.Close()
-	fmt.Printf("POST TO CHAIN: METHOD:%s CALLS:%s  - TXID:%s\n", method, tx.Processor, TXIDhex)
-	return TXIDhex, nil
-}
-
-//HandleChainTX -
-func HandleChainTX(myID string, tx string) error {
-	blockChainTX, err := decodeChainTX(tx)
-	if err != nil {
-		return err
-	}
-	panic(nil)
-	err = callNextTX(nil, blockChainTX, "5556")
-	if err != nil {
-		return err
-	}
-	return nil
-}
-
-//DecodeChainTX - Decode the On Chain TX into a BlockChainTX object
-func decodeChainTX(payload string) (*api.BlockChainTX, error) {
-	base64DecodedTX, _ := base64.StdEncoding.DecodeString(payload)
-	tx := &api.BlockChainTX{}
-
-	err := json.Unmarshal(base64DecodedTX, tx)
-	if err != nil {
-		return &api.BlockChainTX{}, err
-	}
-	return tx, nil
-}
-
-//DecodeChainTX - Decode the On Chain TX into a BlockChainTX object
-func decodeTX(payload string) (*api.BlockChainTX, string, error) {
-	tx := &api.BlockChainTX{}
-	parts := strings.SplitN(payload, "=", 2)
-	if len(parts) != 2 {
-		return &api.BlockChainTX{}, "", errors.New("Invalid TX payload")
-	}
-	hash := string(parts[0])
-	err := json.Unmarshal([]byte(parts[1]), tx)
-	if err != nil {
-		return &api.BlockChainTX{}, "", err
-	}
-	return tx, hash, nil
-}
-
-func callNextTX(svc service.Service, tx *api.BlockChainTX, listenPort string) error {
-	switch tx.Processor {
-	case "none":
-		return nil
-	case "dump":
-		svc.Dump(tx)
-	case "v1/fulfill/order":
-		svc.FulfillOrder(tx)
-	case "v1/order2":
-		svc.Order2(tx)
-	case "v1/fulfill/order/secret":
-		svc.FulfillOrderSecret(tx)
-	case "v1/order/secret2":
-		svc.OrderSecret2(tx)
-
-	default:
-		return errors.New("Unknown processor")
-	}
-	return nil
-}
-
-//DumpTXID -
-func DumpTXID(txid string) {
-	value, raw := QueryChain(txid)
-	println(value)
-	bc, _ := decodeChainTX(raw)
-	println(string(bc.Payload))
-	println()
-}
-
-//ProcessTransactionID -
-func ProcessTransactionID(txid string) {
-	_, payload := QueryChain((txid))
-	err := HandleChainTX("", payload)
-	if err != nil {
-		panic(err)
-	}
-}
-
-func unique(stringSlice []string) []string {
-	keys := make(map[string]bool)
-	list := []string{}
-	for _, entry := range stringSlice {
-		if _, value := keys[entry]; !value {
-			keys[entry] = true
-			list = append(list, entry)
-		}
-	}
-	return list
-}
diff --git a/pkg/tendermint/tendermint_test.go b/pkg/tendermint/tendermint_test.go
deleted file mode 100644
index 495f74f..0000000
--- a/pkg/tendermint/tendermint_test.go
+++ /dev/null
@@ -1,60 +0,0 @@
-package tendermint
-
-import "testing"
-
-var (
-	nodeID = "QmT4y4MtV5mvPHkFjfUQYQ7h1WvAagMy2GTJCn2bF8DQb7"
-)
-
-func Test_Order1(t *testing.T) {
-	a := "eyJQcm9jZXNzb3IiOiJ2MS9mdWxmaWxsL29yZGVyIiwiU2VuZGVySUQiOiJRbVQ0eTRNdFY1bXZQSGtGamZVUVlRN2gxV3ZBYWdNeTJHVEpDbjJiRjhEUWI3IiwiUmVjaXBpZW50SUQiOiJRbVQ0eTRNdFY1bXZQSGtGamZVUVlRN2gxV3ZBYWdNeTJHVEpDbjJiRjhEUWI3IiwiUGF5bG9hZCI6ImV5SnZjbVJsY2xCaGNuUXhRMGxFSWpvaVVXMVpVRU5xVEVGME1tbzVVbWhxU0U1TVkwRnVObEF5WTJseVJHWjZTRlpFWTBwMFkzbGtUVFZ5VWxoM1V5SXNJbVJ2WTNWdFpXNTBRMGxFSWpvaVVXMVVOSGswVFhSV05XMTJVRWhyUm1wbVZWRlpVVGRvTVZkMlFXRm5UWGt5UjFSS1EyNHlZa1k0UkZGaU55SjkifQ=="
-	err := HandleChainTX(nodeID, a)
-	if err != nil {
-		panic(err)
-	}
-}
-
-func Test_FullFill(t *testing.T) {
-	a := "eyJQcm9jZXNzb3IiOiJPUkRFUl9SRVNQT05TRSIsIlNlbmRlcklEIjoiUW1UNHk0TXRWNW12UEhrRmpmVVFZUTdoMVd2QWFnTXkyR1RKQ24yYkY4RFFiNyIsIlJlY2lwaWVudElEIjoiUW1UNHk0TXRWNW12UEhrRmpmVVFZUTdoMVd2QWFnTXkyR1RKQ24yYkY4RFFiNyIsIlBheWxvYWQiOiJleUp2Y21SbGNsQmhjblF5UTBsRUlqb2lVVzFVZUZka1ltZEdhRGxHYWpGMlJIbFhlazVCWkROVmFuRjNlVEYyTkRsRlFtVjJhRzUyTVVWdk5HVllSaUo5In0="
-	err := HandleChainTX(nodeID, a)
-	if err != nil {
-		panic(err)
-	}
-
-}
-
-func Test_DumpTXID(t *testing.T) {
-	a := "5fe5823c0d8b6d49f2ac99c90575566962ac3a14a6b2f1e7fe7ea1099b7b3bbd"
-	value, raw := QueryChain(a)
-	println(value)
-	bc, _ := decodeChainTX(raw)
-	print(string(bc.Payload))
-}
-
-//Use this to generate Order1
-//curl -s -X POST "http://localhost:5556/v1/order1" -H "accept: */*" -H "Content-Type: application/json" -d "{\"beneficiaryIDDocumentCID\":\"\",\"extension\":{\"coin\":\"0\"}}"
-
-func Test_All(t *testing.T) {
-	DumpTXID("dea1396bce7890f85da7dc86b4ece5c4d372886ed08948eca6a0beee36c412e0")
-
-}
-
-func Test_1(t *testing.T) {
-	txid := "473407b069ff917b110f38c36d5b9e5246b5ace5d82df38c5a188d5ac868cfec"
-	DumpTXID(txid)
-	ProcessTransactionID(txid)
-}
-
-func Test_2(t *testing.T) {
-	txid := "586bc14b15a31999571c8188241beef046d3b78a9481ecee984e7c76a1d95112"
-	DumpTXID(txid)
-	ProcessTransactionID(txid)
-}
-
-func Test_3(t *testing.T) {
-	txid := "5a48129fd272f2a8c57fdd96716a78c3be55a3cf811b179e82e54221d95ccbc4"
-	DumpTXID(txid)
-	ProcessTransactionID(txid)
-}
-
-//curl -s -X POST "http://localhost:5556/v1/order1" -H "accept: */*" -H "Content-Type: application/json" -d "{\"beneficiaryIDDocumentCID\":\"\",\"extension\":{\"coin\":\"0\"}}"
diff --git a/pkg/tendermint/websockets.go b/pkg/tendermint/websockets.go
deleted file mode 100644
index 472ea95..0000000
--- a/pkg/tendermint/websockets.go
+++ /dev/null
@@ -1,204 +0,0 @@
-package tendermint
-
-import (
-	"context"
-	"encoding/json"
-	"fmt"
-	"net/http"
-	"os"
-	"os/signal"
-	"strconv"
-	"syscall"
-
-	"github.com/apache/incubator-milagro-dta/libs/datastore"
-	"github.com/apache/incubator-milagro-dta/libs/logger"
-	"github.com/apache/incubator-milagro-dta/pkg/api"
-	"github.com/apache/incubator-milagro-dta/pkg/service"
-	status "github.com/apache/incubator-milagro-dta/pkg/tendermint/status"
-	"github.com/pkg/errors"
-	tmclient "github.com/tendermint/tendermint/rpc/client"
-	ctypes "github.com/tendermint/tendermint/rpc/core/types"
-	tmtypes "github.com/tendermint/tendermint/types"
-)
-
-func catchUp(quene chan tmtypes.Tx, store *datastore.Store, logger *logger.Logger, nodeID string, listenPort string, height int) error {
-	print("catch up")
-	return nil
-}
-
-//Subscribe to Websocket and add to queue
-func subscribeAndQueue(queueWaiting chan api.BlockChainTX, logger *logger.Logger, nodeID string, listenPort string, blockchainNode string) error {
-	client := tmclient.NewHTTP("tcp://"+blockchainNode+"", "/websocket")
-	//client.SetLogger(tmlogger)
-	err := client.Start()
-	if err != nil {
-		logger.Info("Failed to start Tendermint HTTP client %s", err)
-		return err
-	}
-	defer client.Stop()
-
-	//curl "34.246.173.153:26657/tx_search?query=\"tag.part=4%20AND%20tag.reference='579a2864-e100-11e9-aaf4-acde48001122'\""
-	query := "tag.recipient='" + nodeID + "'"
-	//query := "tm.event = 'Tx'"
-
-	out, err := client.Subscribe(context.Background(), "test", query, 1000)
-	if err != nil {
-		logger.Info("Failed to subscribe to query %s %s", query, err)
-		return err
-	}
-
-	logger.Info("Tendermint: Connected")
-
-	quit := make(chan os.Signal, 1)
-	signal.Notify(quit, syscall.SIGINT, syscall.SIGTERM)
-	for {
-		select {
-		case result := <-out:
-			tx := result.Data.(tmtypes.EventDataTx).Tx
-			payload := api.BlockChainTX{}
-			err := json.Unmarshal(tx, &payload)
-			if err != nil {
-				logger.Info("******** Invalid TX - ignored")
-				break
-			}
-
-			//check if this node is in receipient list
-			if payload.RecipientID != nodeID {
-				logger.Info("******** Invalid Recipient - why are we receiving this TX?")
-				break
-
-			}
-
-			//Add into the waitingQueue for later processing
-			queueWaiting <- payload
-			//fmt.Printf("Incoming Transaction:%d \n", len(queueWaiting))
-
-		case <-quit:
-			os.Exit(0)
-		}
-	}
-	return nil
-}
-
-func TXbyHash(TXHash string) (api.BlockChainTX, error) {
-	client := tmclient.NewHTTP("tcp://"+node+"", "/websocket")
-	query := fmt.Sprintf("tag.txhash='%s'", TXHash)
-	result, err := client.TxSearch(query, true, 1, 1)
-
-	if len(result.Txs) == 0 {
-		return api.BlockChainTX{}, errors.New("Not found")
-	}
-
-	payload := api.BlockChainTX{}
-	err = json.Unmarshal(result.Txs[0].Tx, &payload)
-
-	_ = payload
-
-	if err != nil {
-		return payload, err
-	}
-	//
-	// res := result.Txs[0]
-	// tx := res.Tx
-	return payload, nil
-
-}
-
-//loadAllHistoricTX - load the history for this node into a queue
-func loadAllHistoricTX(start int, end int, txHistory []ctypes.ResultTx, nodeID string, listenPort string) error {
-	//cycle through the historic transactions page by page
-	//Get all transactions that claim to be from me - check signatures
-	//Get all transactions that claim to be to me -
-
-	client := tmclient.NewHTTP("tcp://"+node+"", "/websocket")
-	currentPage := 1
-	query := fmt.Sprintf("tag.recipient='%v' AND tag.sender='%v' AND tx.height>%d AND tx.height<=%d", nodeID, nodeID, start, end)
-	numPerPage := 5
-
-	for {
-		result, err := client.TxSearch(query, true, currentPage, numPerPage)
-		if err != nil {
-			return errors.New("Failed to query chain for transaction history")
-		}
-
-		for _, tx := range result.Txs {
-			txHistory = append(txHistory, *tx)
-		}
-		if currentPage*numPerPage > result.TotalCount {
-			break
-		}
-		currentPage++
-	}
-	parseHistory(txHistory)
-	return nil
-}
-
-func parseHistory(txHistory []ctypes.ResultTx) {
-	txCount := len(txHistory)
-
-	//loop backwards
-	for i := txCount - 1; i >= 0; i-- {
-		resTx := txHistory[i]
-		tx := resTx.Tx
-
-		//Decode TX into BlockchainTX Object
-		payload := api.BlockChainTX{}
-		err := json.Unmarshal(tx, &payload)
-		if err != nil {
-			msg := fmt.Sprintf("Invalid Transaction Hash:%v Height:%v Index:% \n", resTx.Hash, resTx.Height, resTx.Index)
-			print(msg)
-			continue
-		}
-		//Decode BlockchainTX.payload into Protobuffer Qredo
-		// TODO:
-		// Parse the incoming TX, check sig
-		// If from self, can assume correct
-		// builds transaction chains using previous transactionHash
-		// Ensure every
-		// Check recipient/sender in tags are correct
-		//
-		_ = payload
-	}
-	print("Finished loading - but not parsing the History\n")
-}
-
-func processTXQueue(svc service.Service, queue chan api.BlockChainTX, listenPort string) {
-	print("Processing queue\n")
-	for payload := range queue {
-		callNextTX(svc, &payload, listenPort)
-	}
-}
-
-//Subscribe - Connect to the Tendermint websocket to collect events
-func Subscribe(svc service.Service, store *datastore.Store, logger *logger.Logger, nodeID string, listenPort string) error {
-
-	latestStatus, _ := getChainStatus(node)
-	currentBlockHeight, err := strconv.Atoi(latestStatus.Result.SyncInfo.LatestBlockHeight)
-
-	if err != nil {
-		return errors.New("Failed to obtain latest blockheight of Blockchain")
-	}
-
-	var processedToHeight int
-	store.Get("chain", "height", &processedToHeight)
-
-	//first catch up to Tip of chain
-	var txHistory []ctypes.ResultTx
-	queueWaiting := make(chan api.BlockChainTX, 1000)
-
-	//while we are processessing the history save all new transactions in a queue for later
-	go subscribeAndQueue(queueWaiting, logger, nodeID, listenPort, node)
-	loadAllHistoricTX(processedToHeight, currentBlockHeight, txHistory, nodeID, listenPort)
-	processTXQueue(svc, queueWaiting, listenPort)
-	return nil
-}
-
-func getChainStatus(node string) (status.StatusResponse, error) {
-	resp, err := http.Get("http://" + node + "/status")
-	result := status.StatusResponse{}
-	if err != nil {
-		return result, err
-	}
-	json.NewDecoder(resp.Body).Decode((&result))
-	return result, nil
-}