You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by he...@apache.org on 2021/04/09 07:21:40 UTC
[beam] branch master updated: [BEAM-12141] Print sha256 and size
when downloading artifacts via artifact retrieval service
This is an automated email from the ASF dual-hosted git repository.
heejong pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git
The following commit(s) were added to refs/heads/master by this push:
new b28373b [BEAM-12141] Print sha256 and size when downloading artifacts via artifact retrieval service
new 974c2de Merge pull request #14492 from ihji/BEAM-12141
b28373b is described below
commit b28373b6ca9e251a6b3b0ca234f8e12720301838
Author: Heejong Lee <he...@gmail.com>
AuthorDate: Thu Apr 8 21:15:40 2021 -0700
[BEAM-12141] Print sha256 and size when downloading artifacts via artifact retrieval service
---
sdks/go/pkg/beam/artifact/materialize.go | 18 ++++++++++++------
1 file changed, 12 insertions(+), 6 deletions(-)
diff --git a/sdks/go/pkg/beam/artifact/materialize.go b/sdks/go/pkg/beam/artifact/materialize.go
index 7f975a2..c730f51 100644
--- a/sdks/go/pkg/beam/artifact/materialize.go
+++ b/sdks/go/pkg/beam/artifact/materialize.go
@@ -204,7 +204,7 @@ func (a artifact) retrieve(ctx context.Context, dest string) error {
}
w := bufio.NewWriter(fd)
- err = writeChunks(stream, w)
+ sha256Hash, err := writeChunks(stream, w)
if err != nil {
fd.Close() // drop any buffered content
return errors.Wrapf(err, "failed to retrieve chunk for %v", filename)
@@ -213,24 +213,30 @@ func (a artifact) retrieve(ctx context.Context, dest string) error {
fd.Close()
return errors.Wrapf(err, "failed to flush chunks for %v", filename)
}
+ stat, _ := fd.Stat()
+ log.Printf("Downloaded: %v (sha256: %v, size: %v)", filename, sha256Hash, stat.Size())
+
return fd.Close()
}
-func writeChunks(stream jobpb.ArtifactRetrievalService_GetArtifactClient, w io.Writer) error {
+func writeChunks(stream jobpb.ArtifactRetrievalService_GetArtifactClient, w io.Writer) (string, error) {
+ sha256W := sha256.New()
for {
chunk, err := stream.Recv()
if err == io.EOF {
break
}
if err != nil {
- return err
+ return "", err
+ }
+ if _, err := sha256W.Write(chunk.Data); err != nil {
+ panic(err) // cannot fail
}
-
if _, err := w.Write(chunk.Data); err != nil {
- return errors.Wrapf(err, "chunk write failed")
+ return "", errors.Wrapf(err, "chunk write failed")
}
}
- return nil
+ return hex.EncodeToString(sha256W.Sum(nil)), nil
}
func legacyMaterialize(ctx context.Context, endpoint string, rt string, dest string) ([]*pipepb.ArtifactInformation, error) {