You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by he...@apache.org on 2021/04/09 07:21:40 UTC

[beam] branch master updated: [BEAM-12141] Print sha256 and size when downloading artifacts via artifact retrieval service

This is an automated email from the ASF dual-hosted git repository.

heejong pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git


The following commit(s) were added to refs/heads/master by this push:
     new b28373b  [BEAM-12141] Print sha256 and size when downloading artifacts via artifact retrieval service
     new 974c2de  Merge pull request #14492 from ihji/BEAM-12141
b28373b is described below

commit b28373b6ca9e251a6b3b0ca234f8e12720301838
Author: Heejong Lee <he...@gmail.com>
AuthorDate: Thu Apr 8 21:15:40 2021 -0700

    [BEAM-12141] Print sha256 and size when downloading artifacts via artifact retrieval service
---
 sdks/go/pkg/beam/artifact/materialize.go | 18 ++++++++++++------
 1 file changed, 12 insertions(+), 6 deletions(-)

diff --git a/sdks/go/pkg/beam/artifact/materialize.go b/sdks/go/pkg/beam/artifact/materialize.go
index 7f975a2..c730f51 100644
--- a/sdks/go/pkg/beam/artifact/materialize.go
+++ b/sdks/go/pkg/beam/artifact/materialize.go
@@ -204,7 +204,7 @@ func (a artifact) retrieve(ctx context.Context, dest string) error {
 	}
 	w := bufio.NewWriter(fd)
 
-	err = writeChunks(stream, w)
+	sha256Hash, err := writeChunks(stream, w)
 	if err != nil {
 		fd.Close() // drop any buffered content
 		return errors.Wrapf(err, "failed to retrieve chunk for %v", filename)
@@ -213,24 +213,30 @@ func (a artifact) retrieve(ctx context.Context, dest string) error {
 		fd.Close()
 		return errors.Wrapf(err, "failed to flush chunks for %v", filename)
 	}
+	stat, _ := fd.Stat()
+	log.Printf("Downloaded: %v (sha256: %v, size: %v)", filename, sha256Hash, stat.Size())
+
 	return fd.Close()
 }
 
-func writeChunks(stream jobpb.ArtifactRetrievalService_GetArtifactClient, w io.Writer) error {
+func writeChunks(stream jobpb.ArtifactRetrievalService_GetArtifactClient, w io.Writer) (string, error) {
+	sha256W := sha256.New()
 	for {
 		chunk, err := stream.Recv()
 		if err == io.EOF {
 			break
 		}
 		if err != nil {
-			return err
+			return "", err
+		}
+		if _, err := sha256W.Write(chunk.Data); err != nil {
+			panic(err) // cannot fail
 		}
-
 		if _, err := w.Write(chunk.Data); err != nil {
-			return errors.Wrapf(err, "chunk write failed")
+			return "", errors.Wrapf(err, "chunk write failed")
 		}
 	}
-	return nil
+	return hex.EncodeToString(sha256W.Sum(nil)), nil
 }
 
 func legacyMaterialize(ctx context.Context, endpoint string, rt string, dest string) ([]*pipepb.ArtifactInformation, error) {