You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@milagro.apache.org by sm...@apache.org on 2019/10/07 12:38:38 UTC
[incubator-milagro-dta] 02/02: Refactor Tendermint package
This is an automated email from the ASF dual-hosted git repository.
smihaylov pushed a commit to branch tendermint
in repository https://gitbox.apache.org/repos/asf/incubator-milagro-dta.git
commit 4f81f8ccf9e65e10b1d31570f5748d29032039fa
Author: Stanislav Mihaylov <sm...@gmail.com>
AuthorDate: Mon Oct 7 15:38:09 2019 +0300
Refactor Tendermint package
---
cmd/service/commands.go | 1 +
cmd/service/main.go | 63 ++++++++--
cmd/servicetester/tendertest1.sh | 4 +-
go.mod | 2 -
go.sum | 5 +-
libs/documents/docs.go | 2 +-
pkg/api/proto.go | 10 ++
pkg/common/chain.go | 14 +--
pkg/common/common.go | 5 +-
pkg/defaultservice/fulfillTX.go | 34 ++++--
pkg/defaultservice/init.go | 14 ++-
pkg/defaultservice/order.go | 35 ++++--
pkg/defaultservice/orderTX.go | 45 +++++--
pkg/defaultservice/service.go | 13 +-
pkg/tendermint/cmd/cmd | Bin 10230244 -> 0 bytes
pkg/tendermint/config.go | 6 -
pkg/tendermint/connector.go | 245 ++++++++++++++++++++++++++++++++++++++
pkg/tendermint/tendermint.go | 165 -------------------------
pkg/tendermint/tendermint_test.go | 60 ----------
pkg/tendermint/websockets.go | 204 -------------------------------
20 files changed, 424 insertions(+), 503 deletions(-)
diff --git a/cmd/service/commands.go b/cmd/service/commands.go
index d3d16bd..20a9300 100644
--- a/cmd/service/commands.go
+++ b/cmd/service/commands.go
@@ -78,6 +78,7 @@ func parseConfig(args []string) (*config.Config, error) {
fs := flag.NewFlagSet("daemon", flag.ExitOnError)
fs.StringVar(&(cfg.Plugins.Service), "service", cfg.Plugins.Service, "Service plugin")
+ fs.StringVar(&(cfg.Log.Level), "log-level", cfg.Log.Level, "Log level")
if err := fs.Parse(args); err != nil {
return nil, err
diff --git a/cmd/service/main.go b/cmd/service/main.go
index 41bb4a3..9c77efe 100644
--- a/cmd/service/main.go
+++ b/cmd/service/main.go
@@ -21,6 +21,7 @@ Package main - handles config, initialisation and starts the service daemon
package main
import (
+ "context"
"crypto/rand"
"fmt"
"net/http"
@@ -36,6 +37,7 @@ import (
"github.com/apache/incubator-milagro-dta/libs/ipfs"
"github.com/apache/incubator-milagro-dta/libs/logger"
"github.com/apache/incubator-milagro-dta/libs/transport"
+ "github.com/apache/incubator-milagro-dta/pkg/api"
"github.com/apache/incubator-milagro-dta/pkg/config"
"github.com/apache/incubator-milagro-dta/pkg/defaultservice"
"github.com/apache/incubator-milagro-dta/pkg/endpoints"
@@ -138,11 +140,12 @@ func startDaemon(args []string) error {
cfg.Log.Format,
cfg.Log.Level,
)
-
if err != nil {
return errors.Wrap(err, "init logger")
}
+ logger.Debug("Logger in DEBUG mode!")
+
// Create KV store
logger.Info("Datastore type: %s", cfg.Node.Datastore)
store, err := initDataStore(cfg.Node.Datastore)
@@ -187,6 +190,12 @@ func startDaemon(args []string) error {
}
}
+ // Init Tendermint node connector
+ tmConnector, err := tendermint.NewNodeConnector(cfg.Blockchain.BroadcastNode, cfg.Node.NodeID, store, logger)
+ if err != nil {
+ return errors.Wrap(err, "Blockchain Node connector")
+ }
+
//The Server must have a valid ID before starting up
svcPlugin := plugins.FindServicePlugin(cfg.Plugins.Service)
if svcPlugin == nil {
@@ -200,7 +209,7 @@ func startDaemon(args []string) error {
defaultservice.WithDataStore(store),
defaultservice.WithKeyStore(keyStore),
defaultservice.WithIPFS(ipfsConnector),
- defaultservice.WithMasterFiduciary(masterFiduciaryServer),
+ defaultservice.WithTendermint(tmConnector),
defaultservice.WithConfig(cfg),
); err != nil {
return errors.Errorf("init service plugin %s", cfg.Plugins.Service)
@@ -223,23 +232,47 @@ func startDaemon(args []string) error {
}, []string{"method", "success"})
// Stop chan
+ ctx, cancelContext := context.WithCancel(context.Background())
+
errChan := make(chan error)
- logger.Info("NODE ID (IPFS): %v", svcPlugin.NodeID())
logger.Info("Node Type: %v", strings.ToLower(cfg.Node.NodeType))
- endpoints := endpoints.Endpoints(svcPlugin, cfg.HTTP.CorsAllow, authorizer, logger, cfg.Node.NodeType, svcPlugin)
- httpHandler := transport.NewHTTPHandler(endpoints, logger, duration)
+ logger.Info("Node ID: %v", svcPlugin.NodeID())
+ logger.Info("Master Fiduciary: %v", svcPlugin.MasterFiduciaryNodeID())
//Connect to Blockchain - Tendermint
- go tendermint.Subscribe(svcPlugin, store, logger, cfg.Node.NodeID, cfg.HTTP.ListenAddr)
- if err != nil {
- return errors.Wrap(err, "init Tendermint Blockchain")
- }
+ go func() {
+ processFn := func(tx *api.BlockChainTX) error {
+ switch tx.Processor {
+ case "none":
+ return nil
+ case "dump":
+ svcPlugin.Dump(tx)
+ case "v1/fulfill/order":
+ svcPlugin.FulfillOrder(tx)
+ case "v1/order2":
+ svcPlugin.Order2(tx)
+ case "v1/fulfill/order/secret":
+ svcPlugin.FulfillOrderSecret(tx)
+ case "v1/order/secret2":
+ svcPlugin.OrderSecret2(tx)
+ default:
+ return errors.New("Unknown processor")
+ }
+ return nil
+ }
+
+ logger.Info("Starting Blockchain listener to node: %v", cfg.Blockchain.BroadcastNode)
+ errChan <- tmConnector.Subscribe(ctx, processFn)
+ // errChan <- tendermint.Subscribe(svcPlugin, store, logger, cfg.Node.NodeID, cfg.HTTP.ListenAddr)
+ }()
// Start the application http server
go func() {
- logger.Info("starting listener on %v, custody server %v", cfg.HTTP.ListenAddr, cfg.Node.MasterFiduciaryServer)
- // httpHandler.PathPrefix("/api/").Handler(http.St:ripPrefix("/api/", http.FileServer(http.Dir("./swagger"))))
+ httpEndpoints := endpoints.Endpoints(svcPlugin, cfg.HTTP.CorsAllow, authorizer, logger, cfg.Node.NodeType, svcPlugin)
+ httpHandler := transport.NewHTTPHandler(httpEndpoints, logger, duration)
+
+ logger.Info("Starting HTTP listener on %v", cfg.HTTP.ListenAddr)
errChan <- http.ListenAndServe(cfg.HTTP.ListenAddr, httpHandler)
}()
@@ -247,7 +280,7 @@ func startDaemon(args []string) error {
http.DefaultServeMux.Handle("/metrics", promhttp.Handler())
// Start the debug and metrics http server
go func() {
- logger.Info("starting metrics listener on %v", cfg.HTTP.MetricsAddr)
+ logger.Info("Starting metrics listener on %v", cfg.HTTP.MetricsAddr)
errChan <- http.ListenAndServe(cfg.HTTP.MetricsAddr, http.DefaultServeMux)
}()
}
@@ -260,7 +293,11 @@ func startDaemon(args []string) error {
}()
stopErr := <-errChan
- _ = logger.Log("exit", stopErr.Error())
+ if stopErr != nil {
+ _ = logger.Log("exit", stopErr.Error())
+ }
+
+ cancelContext()
return store.Close()
}
diff --git a/cmd/servicetester/tendertest1.sh b/cmd/servicetester/tendertest1.sh
index 64fc139..7f91702 100755
--- a/cmd/servicetester/tendertest1.sh
+++ b/cmd/servicetester/tendertest1.sh
@@ -1,8 +1,8 @@
-ref=$(curl -s -X POST "127.0.0.1:5556/v1/order1" -H "accept: */*" -H "Content-Type: application/json" -d "{\"beneficiaryIDDocumentCID\":\"\",\"extension\":{\"coin\":\"0\"}}")
+ref=$(curl -s -X POST "127.0.0.1:5556/v1/order1" -H "accept: */*" -H "Content-Type: application/json" -d "{\"beneficiaryIDDocumentCID\":\"QmecffRZTSJDETCRLcjcPe7ynmYNyYpnh7WKzKTdmX1GBZ\",\"extension\":{\"coin\":\"0\"}}")
#sleep long enough for blockchain to catch up
sleep 4
-curl -X POST "127.0.0.1:5556/v1/order/secret1" -H "accept: */*" -H "Content-Type: application/json" -d "{\"orderReference\":$ref,\"beneficiaryIDDocumentCID\":\"QmcyJqEMqNEEYHrNSyUY83CQCNwZ5yVan3SgaQ4NchsqsC\"}"
+curl -X POST "127.0.0.1:5556/v1/order/secret1" -H "accept: */*" -H "Content-Type: application/json" -d "{\"orderReference\":$ref,\"beneficiaryIDDocumentCID\":\"QmecffRZTSJDETCRLcjcPe7ynmYNyYpnh7WKzKTdmX1GBZ\"}"
diff --git a/go.mod b/go.mod
index 7e0150c..860eb9c 100644
--- a/go.mod
+++ b/go.mod
@@ -2,8 +2,6 @@ module github.com/apache/incubator-milagro-dta
require (
github.com/TylerBrock/colorjson v0.0.0-20180527164720-95ec53f28296
- github.com/btcsuite/btcd v0.0.0-20190427004231-96897255fd17
- github.com/VividCortex/gohistogram v1.0.0 // indirect
github.com/btcsuite/btcd v0.0.0-20190824003749-130ea5bddde3
github.com/btcsuite/btcutil v0.0.0-20190425235716-9e5f4b9a998d
github.com/coreos/go-oidc v2.0.0+incompatible
diff --git a/go.sum b/go.sum
index 1ee5995..fd46e0f 100644
--- a/go.sum
+++ b/go.sum
@@ -16,8 +16,8 @@ github.com/TylerBrock/colorjson v0.0.0-20180527164720-95ec53f28296 h1:JYWTroLXcN
github.com/TylerBrock/colorjson v0.0.0-20180527164720-95ec53f28296/go.mod h1:VSw57q4QFiWDbRnjdX8Cb3Ow0SFncRw+bA/ofY6Q83w=
github.com/VividCortex/gohistogram v1.0.0 h1:6+hBz+qvs0JOrrNhhmR7lFxo5sINxBCGXrdtl/UvroE=
github.com/VividCortex/gohistogram v1.0.0/go.mod h1:Pf5mBqqDxYaXu3hDrrU+w6nw50o/4+TcAqDqk/vUH7g=
-github.com/Workiva/go-datastructures v1.0.50/go.mod h1:Z+F2Rca0qCsVYDS8z7bAGm8f3UkzuWYS/oBZz5a7VVA=
github.com/VividCortex/gohistogram v1.0.0/go.mod h1:Pf5mBqqDxYaXu3hDrrU+w6nw50o/4+TcAqDqk/vUH7g=
+github.com/Workiva/go-datastructures v1.0.50/go.mod h1:Z+F2Rca0qCsVYDS8z7bAGm8f3UkzuWYS/oBZz5a7VVA=
github.com/aead/siphash v1.0.1 h1:FwHfE/T45KPKYuuSAKyyvE+oPWcaQ+CUmFW0bPlM+kg=
github.com/aead/siphash v1.0.1/go.mod h1:Nywa3cDsYNNK3gaciGTWPwHt0wlpNV15vwmswBAUSII=
github.com/alecthomas/template v0.0.0-20160405071501-a0175ee3bccc/go.mod h1:LOuyumcjzFXgccqObfd/Ljyb9UuFJ6TxHnclSeseNhc=
@@ -105,9 +105,9 @@ github.com/fortytw2/leaktest v1.3.0 h1:u8491cBMTQ8ft8aeV+adlcytMZylmA5nnwwkRZjI8
github.com/fortytw2/leaktest v1.3.0/go.mod h1:jDsjWgpAGjm2CA7WthBh/CdZYEPF31XHquHwclZch5g=
github.com/fsnotify/fsnotify v1.4.7 h1:IXs+QLmnXW2CcXuY+8Mzv/fWEsPGWxqefPtCP5CnV9I=
github.com/fsnotify/fsnotify v1.4.7/go.mod h1:jwhsz4b93w/PPRr/qN1Yymfu8t87LnFCMoQvtojpjFo=
+github.com/fsnotify/fsnotify v1.4.7/go.mod h1:jwhsz4b93w/PPRr/qN1Yymfu8t87LnFCMoQvtojpjFo=
github.com/ghodss/yaml v1.0.0/go.mod h1:4dBDuWmgqj2HViK6kFavaiC9ZROes6MMH2rRYeMEF04=
github.com/go-check/check v0.0.0-20180628173108-788fd7840127 h1:0gkP6mzaMqkmpcJYCFOLkIBwI7xFExG03bbkOkCvUPI=
-github.com/fsnotify/fsnotify v1.4.7/go.mod h1:jwhsz4b93w/PPRr/qN1Yymfu8t87LnFCMoQvtojpjFo=
github.com/go-check/check v0.0.0-20180628173108-788fd7840127/go.mod h1:9ES+weclKsC9YodN5RgxqK/VD9HM9JsCSh7rNhMZE98=
github.com/go-critic/go-critic v0.3.5-0.20190526074819-1df300866540/go.mod h1:+sE8vrLDS2M0pZkBk0wy6+nLdKexVDrl/jBqQOTDThA=
github.com/go-kit/kit v0.8.0 h1:Wz+5lgoB0kkuqLEc6NVmwRknTKP6dTGbSqvhZtBI/j0=
@@ -689,6 +689,7 @@ github.com/tendermint/go-amino v0.14.1 h1:o2WudxNfdLNBwMyl2dqOJxiro5rfrEaU0Ugs6o
github.com/tendermint/go-amino v0.14.1/go.mod h1:i/UKE5Uocn+argJJBb12qTZsCDBcAYMbR92AaJVmKso=
github.com/tendermint/tendermint v0.32.4 h1:KwZIMtT+ROvfMYO3wine6F9hak3SpngcRcAIzys1J3I=
github.com/tendermint/tendermint v0.32.4/go.mod h1:D2+A3pNjY+Po72X0mTfaXorFhiVI8dh/Zg640FGyGtE=
+github.com/tendermint/tendermint v0.32.5 h1:2hCLwuzfCKZxXSe/+iMEl+ChJWKJx6g/Wcvq3NMxVN4=
github.com/tendermint/tm-db v0.2.0 h1:rJxgdqn6fIiVJZy4zLpY1qVlyD0TU6vhkT4kEf71TQQ=
github.com/tendermint/tm-db v0.2.0/go.mod h1:0cPKWu2Mou3IlxecH+MEUSYc1Ch537alLe6CpFrKzgw=
github.com/texttheater/golang-levenshtein v0.0.0-20180516184445-d188e65d659e/go.mod h1:XDKHRm5ThF8YJjx001LtgelzsoaEcvnA7lVWz9EeX3g=
diff --git a/libs/documents/docs.go b/libs/documents/docs.go
index 7e9fe27..59bad92 100644
--- a/libs/documents/docs.go
+++ b/libs/documents/docs.go
@@ -93,7 +93,7 @@ func DecodeIDDocument(rawdoc []byte, tag string, idDocument *IDDoc) error {
return nil
}
-//PeekOrderDocument - look at the header inside an order document before decryption
+//OrderDocumentSigner - look at the header inside an order document before decryption
func OrderDocumentSigner(rawDoc []byte) (string, error) {
signedEnvelope := SignedEnvelope{}
err := proto.Unmarshal(rawDoc, &signedEnvelope)
diff --git a/pkg/api/proto.go b/pkg/api/proto.go
index aedcd2c..e3200d7 100644
--- a/pkg/api/proto.go
+++ b/pkg/api/proto.go
@@ -24,6 +24,8 @@ package api
*/
import (
+ "crypto/sha256"
+ "encoding/hex"
"time"
)
@@ -51,6 +53,14 @@ type BlockChainTX struct {
Tags map[string]string
}
+// CalcHash calculates, sets the TXhash and returns the string representation
+func (tx *BlockChainTX) CalcHash() string {
+ txSha := sha256.Sum256(tx.Payload)
+ tx.TXhash = txSha[:]
+ return hex.EncodeToString(txSha[:])
+
+}
+
//CreateIdentityRequest -
type CreateIdentityRequest struct {
Name string `json:"name,omitempty" validate:"required,alphanum"`
diff --git a/pkg/common/chain.go b/pkg/common/chain.go
index d7a7023..a0438b5 100644
--- a/pkg/common/chain.go
+++ b/pkg/common/chain.go
@@ -10,16 +10,8 @@ import (
)
// CreateTX creates the transaction ready for write to the chain
-func CreateTX(nodeID string, store *datastore.Store, id string, order *documents.OrderDoc, recipients map[string]documents.IDDoc) ([]byte, []byte, error) {
- secrets := &IdentitySecrets{}
- if err := store.Get("id-doc", nodeID, secrets); err != nil {
- return nil, nil, errors.New("load secrets from store")
- }
- blsSecretKey, err := hex.DecodeString(secrets.BLSSecretKey)
- if err != nil {
- return nil, nil, errors.Wrap(err, "Decode identity secrets")
- }
- rawDoc, err := documents.EncodeOrderDocument(nodeID, *order, blsSecretKey, "previousID", recipients)
+func CreateTX(nodeID string, store *datastore.Store, blsSecretKey []byte, id string, order *documents.OrderDoc, recipients map[string]*documents.IDDoc) ([]byte, []byte, error) {
+ rawDoc, err := documents.EncodeOrderDocument(nodeID, *order, blsSecretKey, recipients)
if err != nil {
return nil, nil, errors.Wrap(err, "Failed to encode IDDocument")
}
@@ -32,7 +24,7 @@ func CreateTX(nodeID string, store *datastore.Store, id string, order *documents
return TXID[:], rawDoc, nil
}
-//Decode a transaction for header data but don't decrypt it
+//PeekTX Decode a transaction for header data but don't decrypt it
func PeekTX(tx []byte) (string, error) {
signerCID, err := documents.OrderDocumentSigner(tx)
print(signerCID)
diff --git a/pkg/common/common.go b/pkg/common/common.go
index 4e8220b..cdca4fe 100644
--- a/pkg/common/common.go
+++ b/pkg/common/common.go
@@ -105,6 +105,7 @@ func RetrieveSeed(store *datastore.Store, reference string) (seedHex string, err
return seedHex, nil
}
+//WriteOrderToStore stores an order
func WriteOrderToStore(store *datastore.Store, orderReference string, address string) error {
if err := store.Set("order", orderReference, address, map[string]string{"time": time.Now().UTC().Format(time.RFC3339)}); err != nil {
return errors.New("Save Order to store")
@@ -113,8 +114,8 @@ func WriteOrderToStore(store *datastore.Store, orderReference string, address st
}
// BuildRecipientList builds a list of recipients who are able to decrypt the encrypted envelope
-func BuildRecipientList(ipfs ipfs.Connector, IDDocs ...string) (map[string]documents.IDDoc, error) {
- recipients := make(map[string]documents.IDDoc)
+func BuildRecipientList(ipfs ipfs.Connector, IDDocs ...string) (map[string]*documents.IDDoc, error) {
+ recipients := make(map[string]*documents.IDDoc)
for _, v := range IDDocs {
iddoc, err := RetrieveIDDocFromIPFS(ipfs, v)
if err != nil {
diff --git a/pkg/defaultservice/fulfillTX.go b/pkg/defaultservice/fulfillTX.go
index b0c1025..b0527a9 100644
--- a/pkg/defaultservice/fulfillTX.go
+++ b/pkg/defaultservice/fulfillTX.go
@@ -25,9 +25,10 @@ import (
"github.com/apache/incubator-milagro-dta/libs/documents"
"github.com/apache/incubator-milagro-dta/pkg/api"
"github.com/apache/incubator-milagro-dta/pkg/common"
- "github.com/apache/incubator-milagro-dta/pkg/tendermint"
+ "github.com/apache/incubator-milagro-dta/pkg/identity"
)
+// FulfillOrder TX
func (s *Service) FulfillOrder(tx *api.BlockChainTX) (string, error) {
nodeID := s.NodeID()
reqPayload := tx.Payload
@@ -40,7 +41,16 @@ func (s *Service) FulfillOrder(tx *api.BlockChainTX) (string, error) {
}
remoteIDDocCID := signerID
- _, _, _, sikeSK, err := common.RetrieveIdentitySecrets(s.Store, nodeID)
+ // SIKE and BLS keys
+ keyseed, err := s.KeyStore.Get("seed")
+ if err != nil {
+ return "", err
+ }
+ _, sikeSK, err := identity.GenerateSIKEKeys(keyseed)
+ if err != nil {
+ return "", err
+ }
+ _, blsSK, err := identity.GenerateBLSKeys(keyseed)
if err != nil {
return "", err
}
@@ -83,7 +93,7 @@ func (s *Service) FulfillOrder(tx *api.BlockChainTX) (string, error) {
}
//Create a new Transaction payload and TX
- txHash, payload, err := common.CreateTX(nodeID, s.Store, nodeID, order, recipientList)
+ txHash, payload, err := common.CreateTX(nodeID, s.Store, blsSK, nodeID, order, recipientList)
//Write the requests to the chain
chainTX := &api.BlockChainTX{
@@ -95,8 +105,8 @@ func (s *Service) FulfillOrder(tx *api.BlockChainTX) (string, error) {
TXhash: txHash,
Tags: map[string]string{"reference": order.Reference, "txhash": hex.EncodeToString(txHash)},
}
- return tendermint.PostToChain(chainTX, "FulfillOrder")
+ return s.Tendermint.PostTx(chainTX, "FulfillOrder")
}
// FulfillOrderSecret -
@@ -112,7 +122,16 @@ func (s *Service) FulfillOrderSecret(tx *api.BlockChainTX) (string, error) {
}
remoteIDDocCID := signerID
- _, _, _, sikeSK, err := common.RetrieveIdentitySecrets(s.Store, nodeID)
+ // SIKE and BLS keys
+ keyseed, err := s.KeyStore.Get("seed")
+ if err != nil {
+ return "", err
+ }
+ _, sikeSK, err := identity.GenerateSIKEKeys(keyseed)
+ if err != nil {
+ return "", err
+ }
+ _, blsSK, err := identity.GenerateBLSKeys(keyseed)
if err != nil {
return "", err
}
@@ -155,7 +174,7 @@ func (s *Service) FulfillOrderSecret(tx *api.BlockChainTX) (string, error) {
}
//Create a new Transaction payload and TX
- txHash, payload, err := common.CreateTX(nodeID, s.Store, nodeID, order, recipientList)
+ txHash, payload, err := common.CreateTX(nodeID, s.Store, blsSK, nodeID, order, recipientList)
//Write the requests to the chain
chainTX := &api.BlockChainTX{
@@ -167,5 +186,6 @@ func (s *Service) FulfillOrderSecret(tx *api.BlockChainTX) (string, error) {
Payload: payload,
Tags: map[string]string{"reference": order.Reference, "txhash": hex.EncodeToString(txHash)},
}
- return tendermint.PostToChain(chainTX, "FulfillOrderSecret")
+
+ return s.Tendermint.PostTx(chainTX, "FulfillOrderSecret")
}
diff --git a/pkg/defaultservice/init.go b/pkg/defaultservice/init.go
index f0c0190..e4874ae 100644
--- a/pkg/defaultservice/init.go
+++ b/pkg/defaultservice/init.go
@@ -24,8 +24,8 @@ import (
"github.com/apache/incubator-milagro-dta/libs/ipfs"
"github.com/apache/incubator-milagro-dta/libs/keystore"
"github.com/apache/incubator-milagro-dta/libs/logger"
- "github.com/apache/incubator-milagro-dta/pkg/api"
"github.com/apache/incubator-milagro-dta/pkg/config"
+ "github.com/apache/incubator-milagro-dta/pkg/tendermint"
)
// ServiceOption function to set Service properties
@@ -84,10 +84,18 @@ func WithIPFS(ipfsConnector ipfs.Connector) ServiceOption {
}
}
+// WithTendermint adds tendermint node connector to the Service
+func WithTendermint(tmConnector *tendermint.NodeConnector) ServiceOption {
+ return func(s *Service) error {
+ s.Tendermint = tmConnector
+ return nil
+ }
+}
+
// WithMasterFiduciary adds master fiduciary connector to the Service
-func WithMasterFiduciary(masterFiduciaryServer api.ClientService) ServiceOption {
+func WithMasterFiduciary(masterFiduciaryNodeID string) ServiceOption {
return func(s *Service) error {
- s.MasterFiduciaryServer = masterFiduciaryServer
+ s.SetMasterFiduciaryNodeID(masterFiduciaryNodeID)
return nil
}
}
diff --git a/pkg/defaultservice/order.go b/pkg/defaultservice/order.go
index acc6a6d..3f60158 100644
--- a/pkg/defaultservice/order.go
+++ b/pkg/defaultservice/order.go
@@ -27,7 +27,6 @@ import (
"github.com/apache/incubator-milagro-dta/pkg/api"
"github.com/apache/incubator-milagro-dta/pkg/common"
"github.com/apache/incubator-milagro-dta/pkg/identity"
- "github.com/apache/incubator-milagro-dta/pkg/tendermint"
"github.com/pkg/errors"
)
@@ -111,8 +110,6 @@ func (s *Service) PrepareOrderPart1(order *documents.OrderDoc, reqExtension map[
}
// PrepareOrderResponse gets the updated order and returns the commitment and extension
-//func (s *Service) PrepareOrderResponse(orderPart2 *documents.OrderDoc, reqExtension, fulfillExtension map[string]string) (commitment string, extension map[string]string, err error) {
-
func (s *Service) PrepareOrderResponse(orderPart2 *documents.OrderDoc) (commitment string, extension map[string]string, err error) {
return orderPart2.OrderPart2.CommitmentPublicKey, nil, nil
}
@@ -158,8 +155,18 @@ func (s *Service) Order1(req *api.OrderRequest) (string, error) {
order.OrderReqExtension[key] = value
}
+ // BLS key
+ keyseed, err := s.KeyStore.Get("seed")
+ if err != nil {
+ return "", err
+ }
+ _, blsSK, err := identity.GenerateBLSKeys(keyseed)
+ if err != nil {
+ return "", err
+ }
+
//This is serialized and output to the chain
- txHash, payload, err := common.CreateTX(nodeID, s.Store, nodeID, order, recipientList)
+ txHash, payload, err := common.CreateTX(nodeID, s.Store, blsSK, nodeID, order, recipientList)
//Write the requests to the chain
chainTX := &api.BlockChainTX{
@@ -171,7 +178,11 @@ func (s *Service) Order1(req *api.OrderRequest) (string, error) {
TXhash: txHash,
Tags: map[string]string{"reference": order.Reference, "txhash": hex.EncodeToString(txHash)},
}
- tendermint.PostToChain(chainTX, "Order1")
+
+ if _, err := s.Tendermint.PostTx(chainTX, "Order1"); err != nil {
+ return "", err
+ }
+
return order.Reference, nil
}
@@ -196,11 +207,11 @@ func (s *Service) OrderSecret1(req *api.OrderSecretRequest) (string, error) {
// SIKE and BLS keys
keyseed, err := s.KeyStore.Get("seed")
if err != nil {
- return nil, err
+ return "", err
}
_, sikeSK, err := identity.GenerateSIKEKeys(keyseed)
if err != nil {
- return nil, err
+ return "", err
}
_, blsSK, err := identity.GenerateBLSKeys(keyseed)
if err != nil {
@@ -212,7 +223,7 @@ func (s *Service) OrderSecret1(req *api.OrderSecretRequest) (string, error) {
return "", err
}
- tx, err := tendermint.TXbyHash(previousOrderHash)
+ tx, err := s.Tendermint.GetTx(previousOrderHash)
if err != nil {
return "", err
}
@@ -258,7 +269,7 @@ func (s *Service) OrderSecret1(req *api.OrderSecretRequest) (string, error) {
Timestamp: time.Now().Unix(),
}
- txHash, payload, err := common.CreateTX(nodeID, s.Store, nodeID, order, recipientList)
+ txHash, payload, err := common.CreateTX(nodeID, s.Store, blsSK, nodeID, order, recipientList)
//Write the requests to the chain
chainTX := &api.BlockChainTX{
@@ -269,6 +280,10 @@ func (s *Service) OrderSecret1(req *api.OrderSecretRequest) (string, error) {
Payload: payload,
Tags: map[string]string{"reference": order.Reference, "txhash": hex.EncodeToString(txHash)},
}
- tendermint.PostToChain(chainTX, "OrderSecret1")
+
+ if _, err := s.Tendermint.PostTx(chainTX, "OrderSecret1"); err != nil {
+ return "", err
+ }
+
return order.Reference, nil
}
diff --git a/pkg/defaultservice/orderTX.go b/pkg/defaultservice/orderTX.go
index 4fe62f4..d6188c9 100644
--- a/pkg/defaultservice/orderTX.go
+++ b/pkg/defaultservice/orderTX.go
@@ -19,11 +19,12 @@ package defaultservice
import (
"encoding/hex"
+ "fmt"
"github.com/apache/incubator-milagro-dta/libs/documents"
"github.com/apache/incubator-milagro-dta/pkg/api"
"github.com/apache/incubator-milagro-dta/pkg/common"
- "github.com/apache/incubator-milagro-dta/pkg/tendermint"
+ "github.com/apache/incubator-milagro-dta/pkg/identity"
"github.com/pkg/errors"
)
@@ -38,7 +39,16 @@ func (s *Service) Order2(tx *api.BlockChainTX) (string, error) {
return "", err
}
- _, _, _, sikeSK, err := common.RetrieveIdentitySecrets(s.Store, nodeID)
+ // SIKE and BLS keys
+ keyseed, err := s.KeyStore.Get("seed")
+ if err != nil {
+ return "", err
+ }
+ _, sikeSK, err := identity.GenerateSIKEKeys(keyseed)
+ if err != nil {
+ return "", err
+ }
+ _, blsSK, err := identity.GenerateBLSKeys(keyseed)
if err != nil {
return "", err
}
@@ -73,7 +83,7 @@ func (s *Service) Order2(tx *api.BlockChainTX) (string, error) {
}
//Generate a transaction
- txHash, payload, err := common.CreateTX(nodeID, s.Store, nodeID, order, recipientList)
+ txHash, payload, err := common.CreateTX(nodeID, s.Store, blsSK, nodeID, order, recipientList)
//Write the Order2 results to the chain
chainTX := &api.BlockChainTX{
@@ -84,8 +94,8 @@ func (s *Service) Order2(tx *api.BlockChainTX) (string, error) {
Payload: payload,
Tags: map[string]string{"reference": order.Reference, "txhash": hex.EncodeToString(txHash)},
}
- return tendermint.PostToChain(chainTX, "Order2")
+ return s.Tendermint.PostTx(chainTX, "Order2")
}
// OrderSecret2 - Process an incoming Blockchain Order/Secret transaction from a MasterFiduciary, to generate the final secret
@@ -94,7 +104,16 @@ func (s *Service) OrderSecret2(tx *api.BlockChainTX) (string, error) {
reqPayload := tx.Payload
txHashString := hex.EncodeToString(tx.TXhash)
- _, _, _, sikeSK, err := common.RetrieveIdentitySecrets(s.Store, nodeID)
+ // SIKE and BLS keys
+ keyseed, err := s.KeyStore.Get("seed")
+ if err != nil {
+ return "", err
+ }
+ _, sikeSK, err := identity.GenerateSIKEKeys(keyseed)
+ if err != nil {
+ return "", err
+ }
+ _, blsSK, err := identity.GenerateBLSKeys(keyseed)
if err != nil {
return "", err
}
@@ -107,17 +126,16 @@ func (s *Service) OrderSecret2(tx *api.BlockChainTX) (string, error) {
//Decode the Order from the supplied TX
order := &documents.OrderDoc{}
err = documents.DecodeOrderDocument(reqPayload, txHashString, order, sikeSK, nodeID, remoteIDDoc.BLSPublicKey)
+ if err != nil {
+ fmt.Println("ERROR DEcode Order:", err)
+ return "", err
+ }
if order.BeneficiaryCID != nodeID {
return "", errors.New("Invalid Processor")
}
- _, seed, _, sikeSK, err := common.RetrieveIdentitySecrets(s.Store, nodeID)
- if err != nil {
- return "", err
- }
-
- finalPrivateKey, _, extension, err := s.Plugin.ProduceFinalSecret(seed, sikeSK, order, order, nodeID)
+ finalPrivateKey, _, extension, err := s.Plugin.ProduceFinalSecret(keyseed, sikeSK, order, order, nodeID)
if err != nil {
return "", err
}
@@ -140,7 +158,7 @@ func (s *Service) OrderSecret2(tx *api.BlockChainTX) (string, error) {
if err != nil {
return "", err
}
- txHash, payload, err := common.CreateTX(nodeID, s.Store, nodeID, order, recipientList)
+ txHash, payload, err := common.CreateTX(nodeID, s.Store, blsSK, nodeID, order, recipientList)
//Write the requests to the chain
chainTX := &api.BlockChainTX{
@@ -151,5 +169,6 @@ func (s *Service) OrderSecret2(tx *api.BlockChainTX) (string, error) {
Payload: payload,
Tags: map[string]string{"reference": order.Reference, "txhash": hex.EncodeToString(txHash)},
}
- return tendermint.PostToChain(chainTX, "OrderSecret2")
+
+ return s.Tendermint.PostTx(chainTX, "OrderSecret2")
}
diff --git a/pkg/defaultservice/service.go b/pkg/defaultservice/service.go
index 9299b92..2b9fe0d 100644
--- a/pkg/defaultservice/service.go
+++ b/pkg/defaultservice/service.go
@@ -34,7 +34,8 @@ import (
"github.com/apache/incubator-milagro-dta/libs/transport"
"github.com/apache/incubator-milagro-dta/pkg/api"
"github.com/apache/incubator-milagro-dta/pkg/common"
- "github.com/apache/incubator-milagro-dta/pkg/config"
+ "github.com/apache/incubator-milagro-dta/pkg/identity"
+ "github.com/apache/incubator-milagro-dta/pkg/tendermint"
"github.com/hokaccha/go-prettyjson"
)
@@ -52,6 +53,7 @@ type Service struct {
Store *datastore.Store
KeyStore keystore.Store
Ipfs ipfs.Connector
+ Tendermint *tendermint.NodeConnector
nodeID string
masterFiduciaryNodeID string
}
@@ -115,7 +117,12 @@ func (s *Service) Dump(tx *api.BlockChainTX) error {
return err
}
- _, _, _, sikeSK, err := common.RetrieveIdentitySecrets(s.Store, nodeID)
+ // SIKE and BLS keys
+ keyseed, err := s.KeyStore.Get("seed")
+ if err != nil {
+ return err
+ }
+ _, sikeSK, err := identity.GenerateSIKEKeys(keyseed)
if err != nil {
return err
}
@@ -127,6 +134,8 @@ func (s *Service) Dump(tx *api.BlockChainTX) error {
fmt.Println(string(pp))
return nil
+}
+
// Endpoints for extending the plugin endpoints
func (s *Service) Endpoints() (namespace string, endpoints transport.HTTPEndpoints) {
return s.Name(), nil
diff --git a/pkg/tendermint/cmd/cmd b/pkg/tendermint/cmd/cmd
deleted file mode 100755
index 0e07c4f..0000000
Binary files a/pkg/tendermint/cmd/cmd and /dev/null differ
diff --git a/pkg/tendermint/config.go b/pkg/tendermint/config.go
deleted file mode 100644
index 510189d..0000000
--- a/pkg/tendermint/config.go
+++ /dev/null
@@ -1,6 +0,0 @@
-package tendermint
-
-const (
- //node = "127.0.0.1:26657"
- node = "34.246.173.153:26657"
-)
diff --git a/pkg/tendermint/connector.go b/pkg/tendermint/connector.go
new file mode 100644
index 0000000..0dab29e
--- /dev/null
+++ b/pkg/tendermint/connector.go
@@ -0,0 +1,245 @@
+package tendermint
+
+import (
+ "context"
+ "encoding/base64"
+ "encoding/json"
+ "fmt"
+ "io/ioutil"
+ "net/http"
+ "strconv"
+ "strings"
+ "time"
+
+ "github.com/apache/incubator-milagro-dta/libs/datastore"
+ "github.com/apache/incubator-milagro-dta/libs/logger"
+ "github.com/apache/incubator-milagro-dta/pkg/api"
+ status "github.com/apache/incubator-milagro-dta/pkg/tendermint/status"
+ "github.com/pkg/errors"
+ tmclient "github.com/tendermint/tendermint/rpc/client"
+ tmtypes "github.com/tendermint/tendermint/types"
+)
+
+const (
+ nodeConnectionTimeout = time.Second * 10
+ txChanSize = 1000
+)
+
+// ProcessTXFunc is executed on each incoming TX
+type ProcessTXFunc func(tx *api.BlockChainTX) error
+
+// NodeConnector is using external tendermint node to post and get transactions
+type NodeConnector struct {
+ nodeID string
+ tmNodeAddr string
+ httpClient *http.Client
+ tmClient *tmclient.HTTP
+ log *logger.Logger
+ store *datastore.Store
+}
+
+// NewNodeConnector constructs a new Tendermint NodeConnector
+func NewNodeConnector(tmNodeAddr string, nodeID string, store *datastore.Store, log *logger.Logger) (conn *NodeConnector, err error) {
+ defer func() {
+ if r := recover(); r != nil {
+ err = errors.Errorf("Initialize tendermint node connector: %v", r)
+ }
+ }()
+
+ tmNodeAddr = strings.TrimRight(tmNodeAddr, "/")
+ tmClient := tmclient.NewHTTP(fmt.Sprintf("tcp://%s", tmNodeAddr), "/websocket")
+ if err := tmClient.Start(); err != nil {
+ return nil, errors.Wrap(err, "Start tendermint client")
+ }
+
+ return &NodeConnector{
+ tmNodeAddr: tmNodeAddr,
+ nodeID: nodeID,
+ log: log,
+ store: store,
+ httpClient: &http.Client{
+ Timeout: nodeConnectionTimeout,
+ },
+ tmClient: tmClient,
+ }, nil
+
+}
+
+// Stop is performing clean-up
+func (nc *NodeConnector) Stop() error {
+ return nc.tmClient.Stop()
+}
+
+// GetTx retreives a transaction by hash
+func (nc *NodeConnector) GetTx(txHash string) (*api.BlockChainTX, error) {
+ query := fmt.Sprintf("tag.txhash='%s'", txHash)
+ result, err := nc.tmClient.TxSearch(query, true, 1, 1)
+ if err != nil {
+ return nil, err
+ }
+ if len(result.Txs) == 0 {
+ return nil, errors.New("Transaction not found")
+ }
+
+ payload := &api.BlockChainTX{}
+ if err := json.Unmarshal(result.Txs[0].Tx, &payload); err != nil {
+ return nil, err
+ }
+
+ return payload, nil
+}
+
+// PostTx posts a transaction to the chain and returns the transaction ID
+func (nc *NodeConnector) PostTx(tx *api.BlockChainTX, method string) (txID string, err error) {
+ txID = tx.CalcHash()
+
+ //serialize the whole transaction
+ serializedTX, err := json.Marshal(tx)
+ if err != nil {
+ return
+ }
+ base64EncodedTX := base64.StdEncoding.EncodeToString(serializedTX)
+
+ // TODO: use net/rpc
+ body := strings.NewReader(`{
+ "jsonrpc": "2.0",
+ "id": "anything",
+ "method": "broadcast_tx_commit",
+ "params": {
+ "tx": "` + base64EncodedTX + `"}
+ }`)
+ url := "http://" + nc.tmNodeAddr
+
+ req, err := http.NewRequest("POST", url, body)
+ if err != nil {
+ return "", errors.Wrap(err, "post to blockchain node")
+ }
+ req.Header.Set("Content-Type", "text/plain;")
+
+ resp, err := nc.httpClient.Do(req)
+ if err != nil {
+ return "", errors.Wrap(err, "post to blockchain node")
+ }
+ defer resp.Body.Close()
+
+ if resp.StatusCode != http.StatusOK {
+ var respErr string
+ if b, err := ioutil.ReadAll(resp.Body); err != nil {
+ respErr = resp.Status
+ } else {
+ respErr = string(b)
+ }
+
+ return "", errors.Errorf("Post to blockchain node status %v: %v", resp.StatusCode, respErr)
+ }
+
+ nc.log.Debug("POST TO CHAIN: METHOD: %s CALLS: %s - TXID: %s", method, tx.Processor, txID)
+
+ return
+}
+
+// Subscribe connects to the Tendermint node and collect the events
+func (nc *NodeConnector) Subscribe(ctx context.Context, processFn ProcessTXFunc) error {
+ chainStatus, err := nc.getChainStatus()
+ if err != nil {
+ return err
+ }
+
+ currentBlockHeight, err := strconv.Atoi(chainStatus.Result.SyncInfo.LatestBlockHeight)
+ if err != nil {
+ return errors.Wrap(err, "Failed to obtain latest blockheight of Blockchain")
+ }
+
+ var processedToHeight int
+ if err := nc.store.Get("chain", "height", &processedToHeight); err != nil {
+ if err != datastore.ErrKeyNotFound {
+ return errors.Wrap(err, "Get last processed block height")
+ }
+ }
+
+ nc.log.Debug("Block height: Current: %v; Processed: %v", currentBlockHeight, processedToHeight)
+
+ // create the transaction queue chan
+ txQueue := make(chan *api.BlockChainTX, txChanSize)
+
+ // Collect events
+ if err := nc.subscribeAndQueue(ctx, txQueue); err != nil {
+ return err
+ }
+
+ // TODO: load historicTX
+
+ // Process events
+ return nc.processTXQueue(ctx, txQueue, processFn)
+}
+
+func (nc *NodeConnector) subscribeAndQueue(ctx context.Context, txQueue chan *api.BlockChainTX) error {
+ query := "tag.recipient='" + nc.nodeID + "'"
+
+ out, err := nc.tmClient.Subscribe(context.Background(), "test", query, 1000)
+ if err != nil {
+ return errors.Wrapf(err, "Failed to subscribe to query %s", query)
+ }
+
+ go func() {
+ for {
+ select {
+ case result := <-out:
+ tx := result.Data.(tmtypes.EventDataTx).Tx
+ payload := &api.BlockChainTX{}
+ err := json.Unmarshal(tx, payload)
+ if err != nil {
+ nc.log.Debug("IGNORED TX - Invalid!")
+ break
+ }
+
+ //check if this node is in receipient list
+ if payload.RecipientID != nc.nodeID {
+ nc.log.Debug("IGNORED TX! Recipient not match the query! (%v != %v)", payload.RecipientID, nc.nodeID)
+ break
+ }
+
+ //Add into the waitingQueue for later processing
+ txQueue <- payload
+ case <-ctx.Done():
+ return
+ }
+ }
+ }()
+
+ return nil
+}
+
+func (nc *NodeConnector) processTXQueue(ctx context.Context, txQueue chan *api.BlockChainTX, processFn ProcessTXFunc) error {
+ for {
+ select {
+ case tx := <-txQueue:
+ if err := processFn(tx); err != nil {
+ // TODO: errors block processing the queue
+ return err
+ }
+ // TODO: store the last block height
+ case <-ctx.Done():
+ return nil
+ }
+ }
+}
+
+func (nc *NodeConnector) getChainStatus() (*status.StatusResponse, error) {
+ url := fmt.Sprintf("http://%s/status", nc.tmNodeAddr)
+ resp, err := nc.httpClient.Get(url)
+ if err != nil {
+ return nil, errors.Wrap(err, "Get node status")
+ }
+ defer resp.Body.Close()
+ if resp.StatusCode != http.StatusOK {
+ return nil, errors.Errorf("Get node status status code: %v", resp.StatusCode)
+ }
+
+ status := &status.StatusResponse{}
+ if err := json.NewDecoder(resp.Body).Decode((&status)); err != nil {
+ return nil, errors.Wrap(err, "Invalid node status response")
+ }
+
+ return status, nil
+}
diff --git a/pkg/tendermint/tendermint.go b/pkg/tendermint/tendermint.go
deleted file mode 100644
index e02b3c1..0000000
--- a/pkg/tendermint/tendermint.go
+++ /dev/null
@@ -1,165 +0,0 @@
-package tendermint
-
-import (
- "bufio"
- "crypto/sha256"
- "encoding/base64"
- "encoding/hex"
- "encoding/json"
- "errors"
- "fmt"
- "net/http"
- "strings"
-
- "github.com/apache/incubator-milagro-dta/pkg/api"
- "github.com/apache/incubator-milagro-dta/pkg/service"
-)
-
-//QueryChain the blockchain for an index
-func QueryChain(index string) (string, string) {
- url := "http://" + node + "/abci_query?data=\"" + index + "\""
- resp, err := http.Get(url)
- if err != nil {
- // handle err
- }
- defer resp.Body.Close()
- scanner := bufio.NewScanner(resp.Body)
- scanner.Split(bufio.ScanBytes)
- t := ""
- for scanner.Scan() {
- t += scanner.Text()
- ///fmt.Print(scanner.Text())
- }
-
- res, _ := UnmarshalChainQuery([]byte(t))
-
- val := res.Result.Response.Value
- decodeVal, _ := base64.StdEncoding.DecodeString(val)
- return string(decodeVal), val
-}
-
-//PostToChain - send TX data to the Blockchain
-func PostToChain(tx *api.BlockChainTX, method string) (string, error) {
- //Create TX Hash
-
- tx.RecipientID = tx.RecipientID
-
- TXID := sha256.Sum256(tx.Payload)
- TXIDhex := hex.EncodeToString(TXID[:])
- tx.TXhash = TXID[:]
-
- //serialize the whole transaction
- serializedTX, _ := json.Marshal(tx)
- base64EncodedTX := base64.StdEncoding.EncodeToString(serializedTX)
-
- body := strings.NewReader("{\"jsonrpc\":\"2.0\",\"id\":\"anything\",\"method\":\"broadcast_tx_commit\",\"params\": {\"tx\": \"" + base64EncodedTX + "\"}}")
- url := "http://" + node + ""
-
- req, err := http.NewRequest("POST", url, body)
- if err != nil {
- print("Error posting to Blockchain")
- return "", err
- }
- req.Header.Set("Content-Type", "text/plain;")
-
- resp, err := http.DefaultClient.Do(req)
- if err != nil {
- print("Error posting to Blockchain")
- return "", err
- }
- defer resp.Body.Close()
- fmt.Printf("POST TO CHAIN: METHOD:%s CALLS:%s - TXID:%s\n", method, tx.Processor, TXIDhex)
- return TXIDhex, nil
-}
-
-//HandleChainTX -
-func HandleChainTX(myID string, tx string) error {
- blockChainTX, err := decodeChainTX(tx)
- if err != nil {
- return err
- }
- panic(nil)
- err = callNextTX(nil, blockChainTX, "5556")
- if err != nil {
- return err
- }
- return nil
-}
-
-//DecodeChainTX - Decode the On Chain TX into a BlockChainTX object
-func decodeChainTX(payload string) (*api.BlockChainTX, error) {
- base64DecodedTX, _ := base64.StdEncoding.DecodeString(payload)
- tx := &api.BlockChainTX{}
-
- err := json.Unmarshal(base64DecodedTX, tx)
- if err != nil {
- return &api.BlockChainTX{}, err
- }
- return tx, nil
-}
-
-//DecodeChainTX - Decode the On Chain TX into a BlockChainTX object
-func decodeTX(payload string) (*api.BlockChainTX, string, error) {
- tx := &api.BlockChainTX{}
- parts := strings.SplitN(payload, "=", 2)
- if len(parts) != 2 {
- return &api.BlockChainTX{}, "", errors.New("Invalid TX payload")
- }
- hash := string(parts[0])
- err := json.Unmarshal([]byte(parts[1]), tx)
- if err != nil {
- return &api.BlockChainTX{}, "", err
- }
- return tx, hash, nil
-}
-
-func callNextTX(svc service.Service, tx *api.BlockChainTX, listenPort string) error {
- switch tx.Processor {
- case "none":
- return nil
- case "dump":
- svc.Dump(tx)
- case "v1/fulfill/order":
- svc.FulfillOrder(tx)
- case "v1/order2":
- svc.Order2(tx)
- case "v1/fulfill/order/secret":
- svc.FulfillOrderSecret(tx)
- case "v1/order/secret2":
- svc.OrderSecret2(tx)
-
- default:
- return errors.New("Unknown processor")
- }
- return nil
-}
-
-//DumpTXID -
-func DumpTXID(txid string) {
- value, raw := QueryChain(txid)
- println(value)
- bc, _ := decodeChainTX(raw)
- println(string(bc.Payload))
- println()
-}
-
-//ProcessTransactionID -
-func ProcessTransactionID(txid string) {
- _, payload := QueryChain((txid))
- err := HandleChainTX("", payload)
- if err != nil {
- panic(err)
- }
-}
-
-func unique(stringSlice []string) []string {
- keys := make(map[string]bool)
- list := []string{}
- for _, entry := range stringSlice {
- if _, value := keys[entry]; !value {
- keys[entry] = true
- list = append(list, entry)
- }
- }
- return list
-}
diff --git a/pkg/tendermint/tendermint_test.go b/pkg/tendermint/tendermint_test.go
deleted file mode 100644
index 495f74f..0000000
--- a/pkg/tendermint/tendermint_test.go
+++ /dev/null
@@ -1,60 +0,0 @@
-package tendermint
-
-import "testing"
-
-var (
- nodeID = "QmT4y4MtV5mvPHkFjfUQYQ7h1WvAagMy2GTJCn2bF8DQb7"
-)
-
-func Test_Order1(t *testing.T) {
- a := "eyJQcm9jZXNzb3IiOiJ2MS9mdWxmaWxsL29yZGVyIiwiU2VuZGVySUQiOiJRbVQ0eTRNdFY1bXZQSGtGamZVUVlRN2gxV3ZBYWdNeTJHVEpDbjJiRjhEUWI3IiwiUmVjaXBpZW50SUQiOiJRbVQ0eTRNdFY1bXZQSGtGamZVUVlRN2gxV3ZBYWdNeTJHVEpDbjJiRjhEUWI3IiwiUGF5bG9hZCI6ImV5SnZjbVJsY2xCaGNuUXhRMGxFSWpvaVVXMVpVRU5xVEVGME1tbzVVbWhxU0U1TVkwRnVObEF5WTJseVJHWjZTRlpFWTBwMFkzbGtUVFZ5VWxoM1V5SXNJbVJ2WTNWdFpXNTBRMGxFSWpvaVVXMVVOSGswVFhSV05XMTJVRWhyUm1wbVZWRlpVVGRvTVZkMlFXRm5UWGt5UjFSS1EyNHlZa1k0UkZGaU55SjkifQ=="
- err := HandleChainTX(nodeID, a)
- if err != nil {
- panic(err)
- }
-}
-
-func Test_FullFill(t *testing.T) {
- a := "eyJQcm9jZXNzb3IiOiJPUkRFUl9SRVNQT05TRSIsIlNlbmRlcklEIjoiUW1UNHk0TXRWNW12UEhrRmpmVVFZUTdoMVd2QWFnTXkyR1RKQ24yYkY4RFFiNyIsIlJlY2lwaWVudElEIjoiUW1UNHk0TXRWNW12UEhrRmpmVVFZUTdoMVd2QWFnTXkyR1RKQ24yYkY4RFFiNyIsIlBheWxvYWQiOiJleUp2Y21SbGNsQmhjblF5UTBsRUlqb2lVVzFVZUZka1ltZEdhRGxHYWpGMlJIbFhlazVCWkROVmFuRjNlVEYyTkRsRlFtVjJhRzUyTVVWdk5HVllSaUo5In0="
- err := HandleChainTX(nodeID, a)
- if err != nil {
- panic(err)
- }
-
-}
-
-func Test_DumpTXID(t *testing.T) {
- a := "5fe5823c0d8b6d49f2ac99c90575566962ac3a14a6b2f1e7fe7ea1099b7b3bbd"
- value, raw := QueryChain(a)
- println(value)
- bc, _ := decodeChainTX(raw)
- print(string(bc.Payload))
-}
-
-//Use this to generate Order1
-//curl -s -X POST "http://localhost:5556/v1/order1" -H "accept: */*" -H "Content-Type: application/json" -d "{\"beneficiaryIDDocumentCID\":\"\",\"extension\":{\"coin\":\"0\"}}"
-
-func Test_All(t *testing.T) {
- DumpTXID("dea1396bce7890f85da7dc86b4ece5c4d372886ed08948eca6a0beee36c412e0")
-
-}
-
-func Test_1(t *testing.T) {
- txid := "473407b069ff917b110f38c36d5b9e5246b5ace5d82df38c5a188d5ac868cfec"
- DumpTXID(txid)
- ProcessTransactionID(txid)
-}
-
-func Test_2(t *testing.T) {
- txid := "586bc14b15a31999571c8188241beef046d3b78a9481ecee984e7c76a1d95112"
- DumpTXID(txid)
- ProcessTransactionID(txid)
-}
-
-func Test_3(t *testing.T) {
- txid := "5a48129fd272f2a8c57fdd96716a78c3be55a3cf811b179e82e54221d95ccbc4"
- DumpTXID(txid)
- ProcessTransactionID(txid)
-}
-
-//curl -s -X POST "http://localhost:5556/v1/order1" -H "accept: */*" -H "Content-Type: application/json" -d "{\"beneficiaryIDDocumentCID\":\"\",\"extension\":{\"coin\":\"0\"}}"
diff --git a/pkg/tendermint/websockets.go b/pkg/tendermint/websockets.go
deleted file mode 100644
index 472ea95..0000000
--- a/pkg/tendermint/websockets.go
+++ /dev/null
@@ -1,204 +0,0 @@
-package tendermint
-
-import (
- "context"
- "encoding/json"
- "fmt"
- "net/http"
- "os"
- "os/signal"
- "strconv"
- "syscall"
-
- "github.com/apache/incubator-milagro-dta/libs/datastore"
- "github.com/apache/incubator-milagro-dta/libs/logger"
- "github.com/apache/incubator-milagro-dta/pkg/api"
- "github.com/apache/incubator-milagro-dta/pkg/service"
- status "github.com/apache/incubator-milagro-dta/pkg/tendermint/status"
- "github.com/pkg/errors"
- tmclient "github.com/tendermint/tendermint/rpc/client"
- ctypes "github.com/tendermint/tendermint/rpc/core/types"
- tmtypes "github.com/tendermint/tendermint/types"
-)
-
-func catchUp(quene chan tmtypes.Tx, store *datastore.Store, logger *logger.Logger, nodeID string, listenPort string, height int) error {
- print("catch up")
- return nil
-}
-
-//Subscribe to Websocket and add to queue
-func subscribeAndQueue(queueWaiting chan api.BlockChainTX, logger *logger.Logger, nodeID string, listenPort string, blockchainNode string) error {
- client := tmclient.NewHTTP("tcp://"+blockchainNode+"", "/websocket")
- //client.SetLogger(tmlogger)
- err := client.Start()
- if err != nil {
- logger.Info("Failed to start Tendermint HTTP client %s", err)
- return err
- }
- defer client.Stop()
-
- //curl "34.246.173.153:26657/tx_search?query=\"tag.part=4%20AND%20tag.reference='579a2864-e100-11e9-aaf4-acde48001122'\""
- query := "tag.recipient='" + nodeID + "'"
- //query := "tm.event = 'Tx'"
-
- out, err := client.Subscribe(context.Background(), "test", query, 1000)
- if err != nil {
- logger.Info("Failed to subscribe to query %s %s", query, err)
- return err
- }
-
- logger.Info("Tendermint: Connected")
-
- quit := make(chan os.Signal, 1)
- signal.Notify(quit, syscall.SIGINT, syscall.SIGTERM)
- for {
- select {
- case result := <-out:
- tx := result.Data.(tmtypes.EventDataTx).Tx
- payload := api.BlockChainTX{}
- err := json.Unmarshal(tx, &payload)
- if err != nil {
- logger.Info("******** Invalid TX - ignored")
- break
- }
-
- //check if this node is in receipient list
- if payload.RecipientID != nodeID {
- logger.Info("******** Invalid Recipient - why are we receiving this TX?")
- break
-
- }
-
- //Add into the waitingQueue for later processing
- queueWaiting <- payload
- //fmt.Printf("Incoming Transaction:%d \n", len(queueWaiting))
-
- case <-quit:
- os.Exit(0)
- }
- }
- return nil
-}
-
-func TXbyHash(TXHash string) (api.BlockChainTX, error) {
- client := tmclient.NewHTTP("tcp://"+node+"", "/websocket")
- query := fmt.Sprintf("tag.txhash='%s'", TXHash)
- result, err := client.TxSearch(query, true, 1, 1)
-
- if len(result.Txs) == 0 {
- return api.BlockChainTX{}, errors.New("Not found")
- }
-
- payload := api.BlockChainTX{}
- err = json.Unmarshal(result.Txs[0].Tx, &payload)
-
- _ = payload
-
- if err != nil {
- return payload, err
- }
- //
- // res := result.Txs[0]
- // tx := res.Tx
- return payload, nil
-
-}
-
-//loadAllHistoricTX - load the history for this node into a queue
-func loadAllHistoricTX(start int, end int, txHistory []ctypes.ResultTx, nodeID string, listenPort string) error {
- //cycle through the historic transactions page by page
- //Get all transactions that claim to be from me - check signatures
- //Get all transactions that claim to be to me -
-
- client := tmclient.NewHTTP("tcp://"+node+"", "/websocket")
- currentPage := 1
- query := fmt.Sprintf("tag.recipient='%v' AND tag.sender='%v' AND tx.height>%d AND tx.height<=%d", nodeID, nodeID, start, end)
- numPerPage := 5
-
- for {
- result, err := client.TxSearch(query, true, currentPage, numPerPage)
- if err != nil {
- return errors.New("Failed to query chain for transaction history")
- }
-
- for _, tx := range result.Txs {
- txHistory = append(txHistory, *tx)
- }
- if currentPage*numPerPage > result.TotalCount {
- break
- }
- currentPage++
- }
- parseHistory(txHistory)
- return nil
-}
-
-func parseHistory(txHistory []ctypes.ResultTx) {
- txCount := len(txHistory)
-
- //loop backwards
- for i := txCount - 1; i >= 0; i-- {
- resTx := txHistory[i]
- tx := resTx.Tx
-
- //Decode TX into BlockchainTX Object
- payload := api.BlockChainTX{}
- err := json.Unmarshal(tx, &payload)
- if err != nil {
- msg := fmt.Sprintf("Invalid Transaction Hash:%v Height:%v Index:% \n", resTx.Hash, resTx.Height, resTx.Index)
- print(msg)
- continue
- }
- //Decode BlockchainTX.payload into Protobuffer Qredo
- // TODO:
- // Parse the incoming TX, check sig
- // If from self, can assume correct
- // builds transaction chains using previous transactionHash
- // Ensure every
- // Check recipient/sender in tags are correct
- //
- _ = payload
- }
- print("Finished loading - but not parsing the History\n")
-}
-
-func processTXQueue(svc service.Service, queue chan api.BlockChainTX, listenPort string) {
- print("Processing queue\n")
- for payload := range queue {
- callNextTX(svc, &payload, listenPort)
- }
-}
-
-//Subscribe - Connect to the Tendermint websocket to collect events
-func Subscribe(svc service.Service, store *datastore.Store, logger *logger.Logger, nodeID string, listenPort string) error {
-
- latestStatus, _ := getChainStatus(node)
- currentBlockHeight, err := strconv.Atoi(latestStatus.Result.SyncInfo.LatestBlockHeight)
-
- if err != nil {
- return errors.New("Failed to obtain latest blockheight of Blockchain")
- }
-
- var processedToHeight int
- store.Get("chain", "height", &processedToHeight)
-
- //first catch up to Tip of chain
- var txHistory []ctypes.ResultTx
- queueWaiting := make(chan api.BlockChainTX, 1000)
-
- //while we are processessing the history save all new transactions in a queue for later
- go subscribeAndQueue(queueWaiting, logger, nodeID, listenPort, node)
- loadAllHistoricTX(processedToHeight, currentBlockHeight, txHistory, nodeID, listenPort)
- processTXQueue(svc, queueWaiting, listenPort)
- return nil
-}
-
-func getChainStatus(node string) (status.StatusResponse, error) {
- resp, err := http.Get("http://" + node + "/status")
- result := status.StatusResponse{}
- if err != nil {
- return result, err
- }
- json.NewDecoder(resp.Body).Decode((&result))
- return result, nil
-}