You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by lo...@apache.org on 2020/03/06 00:47:49 UTC

[beam] branch revert-10942-metrics7 created (now 1bcaaaa)

This is an automated email from the ASF dual-hosted git repository.

lostluck pushed a change to branch revert-10942-metrics7
in repository https://gitbox.apache.org/repos/asf/beam.git.


      at 1bcaaaa  Revert "[BEAM-6374] Emit PCollection metrics from GoSDK (#10942)"

This branch includes the following new commits:

     new 1bcaaaa  Revert "[BEAM-6374] Emit PCollection metrics from GoSDK (#10942)"

The 1 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.



[beam] 01/01: Revert "[BEAM-6374] Emit PCollection metrics from GoSDK (#10942)"

Posted by lo...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

lostluck pushed a commit to branch revert-10942-metrics7
in repository https://gitbox.apache.org/repos/asf/beam.git

commit 1bcaaaa0ae840af38ed9ba523d08180e252787ee
Author: Robert Burke <lo...@users.noreply.github.com>
AuthorDate: Thu Mar 5 16:47:33 2020 -0800

    Revert "[BEAM-6374] Emit PCollection metrics from GoSDK (#10942)"
    
    This reverts commit ded686a58ad4747e91a26d3e59f61019b641e655.
---
 sdks/go/pkg/beam/core/runtime/exec/datasource.go   |  77 +++--------
 .../pkg/beam/core/runtime/exec/datasource_test.go  |  39 ++----
 sdks/go/pkg/beam/core/runtime/exec/pcollection.go  | 153 --------------------
 .../pkg/beam/core/runtime/exec/pcollection_test.go | 154 ---------------------
 sdks/go/pkg/beam/core/runtime/exec/plan.go         |  75 +++++-----
 sdks/go/pkg/beam/core/runtime/exec/translate.go    |  31 +----
 sdks/go/pkg/beam/core/runtime/harness/harness.go   |  24 +---
 .../go/pkg/beam/core/runtime/harness/monitoring.go |  35 ++---
 sdks/go/pkg/beam/runners/direct/direct.go          |   3 +-
 9 files changed, 100 insertions(+), 491 deletions(-)

diff --git a/sdks/go/pkg/beam/core/runtime/exec/datasource.go b/sdks/go/pkg/beam/core/runtime/exec/datasource.go
index fb20aea..ccc4a09 100644
--- a/sdks/go/pkg/beam/core/runtime/exec/datasource.go
+++ b/sdks/go/pkg/beam/core/runtime/exec/datasource.go
@@ -36,14 +36,14 @@ type DataSource struct {
 	Name  string
 	Coder *coder.Coder
 	Out   Node
-	PCol  PCollection // Handles size metrics. Value instead of pointer so it's initialized by default in tests.
 
 	source DataManager
 	state  StateReader
-
-	index    int64
-	splitIdx int64
-	start    time.Time
+	// TODO(lostluck) 2020/02/06: refactor to support more general PCollection metrics on nodes.
+	outputPID string // The index is the output count for the PCollection.
+	index     int64
+	splitIdx  int64
+	start     time.Time
 
 	mu sync.Mutex
 }
@@ -70,29 +70,6 @@ func (n *DataSource) StartBundle(ctx context.Context, id string, data DataContex
 	return n.Out.StartBundle(ctx, id, data)
 }
 
-// ByteCountReader is a passthrough reader that counts all the bytes read through it.
-// It trusts the nested reader to return accurate byte information.
-type byteCountReader struct {
-	count  *int
-	reader io.ReadCloser
-}
-
-func (r *byteCountReader) Read(p []byte) (int, error) {
-	n, err := r.reader.Read(p)
-	*r.count += n
-	return n, err
-}
-
-func (r *byteCountReader) Close() error {
-	return r.reader.Close()
-}
-
-func (r *byteCountReader) reset() int {
-	c := *r.count
-	*r.count = 0
-	return c
-}
-
 // Process opens the data source, reads and decodes data, kicking off element processing.
 func (n *DataSource) Process(ctx context.Context) error {
 	r, err := n.source.OpenRead(ctx, n.SID)
@@ -100,9 +77,6 @@ func (n *DataSource) Process(ctx context.Context) error {
 		return err
 	}
 	defer r.Close()
-	n.PCol.resetSize() // initialize the size distribution for this bundle.
-	var byteCount int
-	bcr := byteCountReader{reader: r, count: &byteCount}
 
 	c := coder.SkipW(n.Coder)
 	wc := MakeWindowDecoder(n.Coder.Window)
@@ -125,7 +99,6 @@ func (n *DataSource) Process(ctx context.Context) error {
 		if n.incrementIndexAndCheckSplit() {
 			return nil
 		}
-		// TODO(lostluck) 2020/02/22: Should we include window headers or just count the element sizes?
 		ws, t, err := DecodeWindowedValueHeader(wc, r)
 		if err != nil {
 			if err == io.EOF {
@@ -135,7 +108,7 @@ func (n *DataSource) Process(ctx context.Context) error {
 		}
 
 		// Decode key or parallel element.
-		pe, err := cp.Decode(&bcr)
+		pe, err := cp.Decode(r)
 		if err != nil {
 			return errors.Wrap(err, "source decode failed")
 		}
@@ -144,7 +117,7 @@ func (n *DataSource) Process(ctx context.Context) error {
 
 		var valReStreams []ReStream
 		for _, cv := range cvs {
-			values, err := n.makeReStream(ctx, pe, cv, &bcr)
+			values, err := n.makeReStream(ctx, pe, cv, r)
 			if err != nil {
 				return err
 			}
@@ -154,15 +127,11 @@ func (n *DataSource) Process(ctx context.Context) error {
 		if err := n.Out.ProcessElement(ctx, pe, valReStreams...); err != nil {
 			return err
 		}
-		// Collect the actual size of the element, and reset the bytecounter reader.
-		n.PCol.addSize(int64(bcr.reset()))
-		bcr.reader = r
 	}
 }
 
-func (n *DataSource) makeReStream(ctx context.Context, key *FullValue, cv ElementDecoder, bcr *byteCountReader) (ReStream, error) {
-	// TODO(lostluck) 2020/02/22: Do we include the chunk size, or just the element sizes?
-	size, err := coder.DecodeInt32(bcr.reader)
+func (n *DataSource) makeReStream(ctx context.Context, key *FullValue, cv ElementDecoder, r io.ReadCloser) (ReStream, error) {
+	size, err := coder.DecodeInt32(r)
 	if err != nil {
 		return nil, errors.Wrap(err, "stream size decoding failed")
 	}
@@ -171,16 +140,16 @@ func (n *DataSource) makeReStream(ctx context.Context, key *FullValue, cv Elemen
 	case size >= 0:
 		// Single chunk streams are fully read in and buffered in memory.
 		buf := make([]FullValue, 0, size)
-		buf, err = readStreamToBuffer(cv, bcr, int64(size), buf)
+		buf, err = readStreamToBuffer(cv, r, int64(size), buf)
 		if err != nil {
 			return nil, err
 		}
 		return &FixedReStream{Buf: buf}, nil
-	case size == -1:
+	case size == -1: // Shouldn't this be 0?
 		// Multi-chunked stream.
 		var buf []FullValue
 		for {
-			chunk, err := coder.DecodeVarInt(bcr.reader)
+			chunk, err := coder.DecodeVarInt(r)
 			if err != nil {
 				return nil, errors.Wrap(err, "stream chunk size decoding failed")
 			}
@@ -190,17 +159,17 @@ func (n *DataSource) makeReStream(ctx context.Context, key *FullValue, cv Elemen
 				return &FixedReStream{Buf: buf}, nil
 			case chunk > 0: // Non-zero chunk, read that many elements from the stream, and buffer them.
 				chunkBuf := make([]FullValue, 0, chunk)
-				chunkBuf, err = readStreamToBuffer(cv, bcr, chunk, chunkBuf)
+				chunkBuf, err = readStreamToBuffer(cv, r, chunk, chunkBuf)
 				if err != nil {
 					return nil, err
 				}
 				buf = append(buf, chunkBuf...)
 			case chunk == -1: // State backed iterable!
-				chunk, err := coder.DecodeVarInt(bcr.reader)
+				chunk, err := coder.DecodeVarInt(r)
 				if err != nil {
 					return nil, err
 				}
-				token, err := ioutilx.ReadN(bcr.reader, (int)(chunk))
+				token, err := ioutilx.ReadN(r, (int)(chunk))
 				if err != nil {
 					return nil, err
 				}
@@ -212,9 +181,6 @@ func (n *DataSource) makeReStream(ctx context.Context, key *FullValue, cv Elemen
 							if err != nil {
 								return nil, err
 							}
-							// We can't re-use the original bcr, since we may get new iterables,
-							// or multiple of them at the same time, but we can re-use the count itself.
-							r = &byteCountReader{reader: r, count: bcr.count}
 							return &elementStream{r: r, ec: cv}, nil
 						},
 					},
@@ -275,11 +241,12 @@ func (n *DataSource) incrementIndexAndCheckSplit() bool {
 }
 
 // ProgressReportSnapshot captures the progress reading an input source.
+//
+// TODO(lostluck) 2020/02/06: Add a visitor pattern for collecting progress
+// metrics from downstream Nodes.
 type ProgressReportSnapshot struct {
-	ID, Name string
-	Count    int64
-
-	pcol PCollectionSnapshot
+	ID, Name, PID string
+	Count         int64
 }
 
 // Progress returns a snapshot of the source's progress.
@@ -288,7 +255,6 @@ func (n *DataSource) Progress() ProgressReportSnapshot {
 		return ProgressReportSnapshot{}
 	}
 	n.mu.Lock()
-	pcol := n.PCol.snapshot()
 	// The count is the number of "completely processed elements"
 	// which matches the index of the currently processing element.
 	c := n.index
@@ -297,8 +263,7 @@ func (n *DataSource) Progress() ProgressReportSnapshot {
 	if c < 0 {
 		c = 0
 	}
-	pcol.ElementCount = c
-	return ProgressReportSnapshot{ID: n.SID.PtransformID, Name: n.Name, Count: c, pcol: pcol}
+	return ProgressReportSnapshot{PID: n.outputPID, ID: n.SID.PtransformID, Name: n.Name, Count: c}
 }
 
 // Split takes a sorted set of potential split indices, selects and actuates
diff --git a/sdks/go/pkg/beam/core/runtime/exec/datasource_test.go b/sdks/go/pkg/beam/core/runtime/exec/datasource_test.go
index 3037c84..1ce493c 100644
--- a/sdks/go/pkg/beam/core/runtime/exec/datasource_test.go
+++ b/sdks/go/pkg/beam/core/runtime/exec/datasource_test.go
@@ -201,16 +201,6 @@ func TestDataSource_Iterators(t *testing.T) {
 			if got, want := iVals, expectedKeys; !equalList(got, want) {
 				t.Errorf("DataSource => %#v, want %#v", extractValues(got...), extractValues(want...))
 			}
-
-			// We're using integers that encode to 1 byte, so do some quick math to validate.
-			sizeOfSmallInt := 1
-			snap := quickTestSnapshot(source, int64(len(test.keys)))
-			snap.pcol.SizeSum = int64(len(test.keys) * (1 + len(test.vals)) * sizeOfSmallInt)
-			snap.pcol.SizeMin = int64((1 + len(test.vals)) * sizeOfSmallInt)
-			snap.pcol.SizeMax = int64((1 + len(test.vals)) * sizeOfSmallInt)
-			if got, want := source.Progress(), snap; got != want {
-				t.Errorf("progress didn't match: got %v, want %v", got, want)
-			}
 		})
 	}
 }
@@ -368,6 +358,15 @@ func TestDataSource_Split(t *testing.T) {
 				})
 
 				validateSource(t, out, source, makeValues(test.expected...))
+
+				// Adjust expectations to maximum number of elements.
+				adjustedExpectation := test.splitIdx
+				if adjustedExpectation > int64(len(elements)) {
+					adjustedExpectation = int64(len(elements))
+				}
+				if got, want := source.Progress().Count, adjustedExpectation; got != want {
+					t.Fatalf("progress didn't match split: got %v, want %v", got, want)
+				}
 			})
 		}
 	})
@@ -465,29 +464,13 @@ func constructAndExecutePlanWithContext(t *testing.T, us []Unit, dc DataContext)
 	}
 }
 
-func quickTestSnapshot(source *DataSource, count int64) ProgressReportSnapshot {
-	return ProgressReportSnapshot{
-		Name:  source.Name,
-		ID:    source.SID.PtransformID,
-		Count: count,
-		pcol: PCollectionSnapshot{
-			ElementCount: count,
-			SizeCount:    count,
-			SizeSum:      count,
-			// We're only encoding small ints here, so size will only be 1.
-			SizeMin: 1,
-			SizeMax: 1,
-		},
-	}
-}
-
 func validateSource(t *testing.T, out *CaptureNode, source *DataSource, expected []FullValue) {
 	t.Helper()
 	if got, want := len(out.Elements), len(expected); got != want {
 		t.Fatalf("lengths don't match: got %v, want %v", got, want)
 	}
-	if got, want := source.Progress(), quickTestSnapshot(source, int64(len(expected))); got != want {
-		t.Fatalf("progress snapshot didn't match: got %v, want %v", got, want)
+	if got, want := source.Progress().Count, int64(len(expected)); got != want {
+		t.Fatalf("progress count didn't match: got %v, want %v", got, want)
 	}
 	if !equalList(out.Elements, expected) {
 		t.Errorf("DataSource => %#v, want %#v", extractValues(out.Elements...), extractValues(expected...))
diff --git a/sdks/go/pkg/beam/core/runtime/exec/pcollection.go b/sdks/go/pkg/beam/core/runtime/exec/pcollection.go
deleted file mode 100644
index d8fb024..0000000
--- a/sdks/go/pkg/beam/core/runtime/exec/pcollection.go
+++ /dev/null
@@ -1,153 +0,0 @@
-// Licensed to the Apache Software Foundation (ASF) under one or more
-// contributor license agreements.  See the NOTICE file distributed with
-// this work for additional information regarding copyright ownership.
-// The ASF licenses this file to You under the Apache License, Version 2.0
-// (the "License"); you may not use this file except in compliance with
-// the License.  You may obtain a copy of the License at
-//
-//    http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing, software
-// distributed under the License is distributed on an "AS IS" BASIS,
-// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-// See the License for the specific language governing permissions and
-// limitations under the License.
-
-package exec
-
-import (
-	"context"
-	"fmt"
-	"math"
-	"math/rand"
-	"sync"
-	"sync/atomic"
-
-	"github.com/apache/beam/sdks/go/pkg/beam/core/graph/coder"
-)
-
-// PCollection is a passthrough node to collect PCollection metrics, and
-// must be placed as the Out node of any producer of a PCollection.
-//
-// In particular, must not be placed after a Multiplex, and must be placed
-// after a Flatten.
-type PCollection struct {
-	UID    UnitID
-	PColID string
-	Out    Node // Out is the consumer of this PCollection.
-	Coder  *coder.Coder
-	Seed   int64
-
-	r             *rand.Rand
-	nextSampleIdx int64 // The index of the next value to sample.
-	elementCoder  ElementEncoder
-
-	elementCount                         int64 // must use atomic operations.
-	sizeMu                               sync.Mutex
-	sizeCount, sizeSum, sizeMin, sizeMax int64
-}
-
-// ID returns the debug id for this unit.
-func (p *PCollection) ID() UnitID {
-	return p.UID
-}
-
-// Up initializes the random sampling source and element encoder.
-func (p *PCollection) Up(ctx context.Context) error {
-	// dedicated rand source
-	p.r = rand.New(rand.NewSource(p.Seed))
-	p.elementCoder = MakeElementEncoder(p.Coder)
-	return nil
-}
-
-// StartBundle resets collected metrics for this PCollection, and propagates bundle start.
-func (p *PCollection) StartBundle(ctx context.Context, id string, data DataContext) error {
-	atomic.StoreInt64(&p.elementCount, 0)
-	p.nextSampleIdx = p.r.Int63n(3)
-	p.resetSize()
-	return MultiStartBundle(ctx, id, data, p.Out)
-}
-
-type byteCounter struct {
-	count int
-}
-
-func (w *byteCounter) Write(p []byte) (n int, err error) {
-	w.count += len(p)
-	return len(p), nil
-}
-
-// ProcessElement increments the element count and sometimes takes size samples of the elements.
-func (p *PCollection) ProcessElement(ctx context.Context, elm *FullValue, values ...ReStream) error {
-	cur := atomic.AddInt64(&p.elementCount, 1)
-	if cur == p.nextSampleIdx {
-		// We pick the next sampling index based on how large this pcollection already is.
-		// We don't want to necessarily wait until the pcollection has doubled, so we reduce the range.
-		// We don't want to always encode the first consecutive elements, so we add 2 to give some variance.
-		// Finally we add 1 no matter what, so that there's always the potential to trigger again.
-		// Otherwise, there's the potential for the random int to be 0, which means we don't change the
-		// nextSampleIdx at all.
-		p.nextSampleIdx = cur + p.r.Int63n(cur/10+2) + 1
-		var w byteCounter
-		p.elementCoder.Encode(elm, &w)
-		p.addSize(int64(w.count))
-	}
-	return p.Out.ProcessElement(ctx, elm, values...)
-}
-
-func (p *PCollection) addSize(size int64) {
-	p.sizeMu.Lock()
-	defer p.sizeMu.Unlock()
-	p.sizeCount++
-	p.sizeSum += size
-	if size > p.sizeMax {
-		p.sizeMax = size
-	}
-	if size < p.sizeMin {
-		p.sizeMin = size
-	}
-}
-
-func (p *PCollection) resetSize() {
-	p.sizeMu.Lock()
-	defer p.sizeMu.Unlock()
-	p.sizeCount = 0
-	p.sizeSum = 0
-	p.sizeMax = math.MinInt64
-	p.sizeMin = math.MaxInt64
-}
-
-// FinishBundle propagates bundle termination.
-func (p *PCollection) FinishBundle(ctx context.Context) error {
-	return MultiFinishBundle(ctx, p.Out)
-}
-
-// Down is a no-op.
-func (p *PCollection) Down(ctx context.Context) error {
-	return nil
-}
-
-func (p *PCollection) String() string {
-	return fmt.Sprintf("PCollection[%v] Out:%v", p.PColID, IDs(p.Out))
-}
-
-// PCollectionSnapshot contains the PCollectionID
-type PCollectionSnapshot struct {
-	ID           string
-	ElementCount int64
-	// If SizeCount is zero, then no size metrics should be exported.
-	SizeCount, SizeSum, SizeMin, SizeMax int64
-}
-
-func (p *PCollection) snapshot() PCollectionSnapshot {
-	p.sizeMu.Lock()
-	defer p.sizeMu.Unlock()
-	return PCollectionSnapshot{
-		ID:           p.PColID,
-		ElementCount: atomic.LoadInt64(&p.elementCount),
-		SizeCount:    p.sizeCount,
-		SizeSum:      p.sizeSum,
-		SizeMin:      p.sizeMin,
-		SizeMax:      p.sizeMax,
-	}
-}
diff --git a/sdks/go/pkg/beam/core/runtime/exec/pcollection_test.go b/sdks/go/pkg/beam/core/runtime/exec/pcollection_test.go
deleted file mode 100644
index f7f24a7..0000000
--- a/sdks/go/pkg/beam/core/runtime/exec/pcollection_test.go
+++ /dev/null
@@ -1,154 +0,0 @@
-// Licensed to the Apache Software Foundation (ASF) under one or more
-// contributor license agreements.  See the NOTICE file distributed with
-// this work for additional information regarding copyright ownership.
-// The ASF licenses this file to You under the Apache License, Version 2.0
-// (the "License"); you may not use this file except in compliance with
-// the License.  You may obtain a copy of the License at
-//
-//    http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing, software
-// distributed under the License is distributed on an "AS IS" BASIS,
-// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-// See the License for the specific language governing permissions and
-// limitations under the License.
-
-package exec
-
-import (
-	"context"
-	"math"
-	"testing"
-
-	"github.com/apache/beam/sdks/go/pkg/beam/core/graph/coder"
-	"github.com/apache/beam/sdks/go/pkg/beam/core/graph/mtime"
-	"github.com/apache/beam/sdks/go/pkg/beam/core/graph/window"
-)
-
-// TestPCollection verifies that the PCollection node works correctly.
-// Seed is by default set to 0, so we have a "deterministic" set of
-// randomness for the samples.
-func TestPCollection(t *testing.T) {
-	a := &CaptureNode{UID: 1}
-	pcol := &PCollection{UID: 2, Out: a, Coder: coder.NewVarInt()}
-	// The "large" 2nd value is to ensure the values are encoded properly,
-	// and that Min & Max are behaving.
-	inputs := []interface{}{int64(1), int64(2000000000), int64(3)}
-	in := &FixedRoot{UID: 3, Elements: makeInput(inputs...), Out: pcol}
-
-	p, err := NewPlan("a", []Unit{a, pcol, in})
-	if err != nil {
-		t.Fatalf("failed to construct plan: %v", err)
-	}
-
-	if err := p.Execute(context.Background(), "1", DataContext{}); err != nil {
-		t.Fatalf("execute failed: %v", err)
-	}
-	if err := p.Down(context.Background()); err != nil {
-		t.Fatalf("down failed: %v", err)
-	}
-
-	expected := makeValues(inputs...)
-	if !equalList(a.Elements, expected) {
-		t.Errorf("multiplex returned %v for a, want %v", extractValues(a.Elements...), extractValues(expected...))
-	}
-	snap := pcol.snapshot()
-	if want, got := int64(len(expected)), snap.ElementCount; got != want {
-		t.Errorf("snapshot miscounted: got %v, want %v", got, want)
-	}
-	checkPCollectionSizeSample(t, snap, 2, 6, 1, 5)
-}
-
-func TestPCollection_sizeReset(t *testing.T) {
-	// Check the initial values after resetting.
-	var pcol PCollection
-	pcol.resetSize()
-	snap := pcol.snapshot()
-	checkPCollectionSizeSample(t, snap, 0, 0, math.MaxInt64, math.MinInt64)
-}
-
-func checkPCollectionSizeSample(t *testing.T, snap PCollectionSnapshot, count, sum, min, max int64) {
-	t.Helper()
-	if want, got := int64(count), snap.SizeCount; got != want {
-		t.Errorf("sample count incorrect: got %v, want %v", got, want)
-	}
-	if want, got := int64(sum), snap.SizeSum; got != want {
-		t.Errorf("sample sum incorrect: got %v, want %v", got, want)
-	}
-	if want, got := int64(min), snap.SizeMin; got != want {
-		t.Errorf("sample min incorrect: got %v, want %v", got, want)
-	}
-	if want, got := int64(max), snap.SizeMax; got != want {
-		t.Errorf("sample max incorrect: got %v, want %v", got, want)
-	}
-}
-
-// BenchmarkPCollection measures the overhead of invoking a ParDo in a plan.
-//
-// On @lostluck's desktop (2020/02/20):
-// BenchmarkPCollection-12                 44699806                24.8 ns/op             0 B/op          0 allocs/op
-func BenchmarkPCollection(b *testing.B) {
-	// Pre allocate the capture buffer and process buffer to avoid
-	// unnecessary overhead.
-	out := &CaptureNode{UID: 1, Elements: make([]FullValue, 0, b.N)}
-	process := make([]MainInput, 0, b.N)
-	for i := 0; i < b.N; i++ {
-		process = append(process, MainInput{Key: FullValue{
-			Windows:   window.SingleGlobalWindow,
-			Timestamp: mtime.ZeroTimestamp,
-			Elm:       int64(1),
-		}})
-	}
-	pcol := &PCollection{UID: 2, Out: out, Coder: coder.NewVarInt()}
-	n := &FixedRoot{UID: 3, Elements: process, Out: pcol}
-	p, err := NewPlan("a", []Unit{n, pcol, out})
-	if err != nil {
-		b.Fatalf("failed to construct plan: %v", err)
-	}
-	b.ResetTimer()
-	if err := p.Execute(context.Background(), "1", DataContext{}); err != nil {
-		b.Fatalf("execute failed: %v", err)
-	}
-	if err := p.Down(context.Background()); err != nil {
-		b.Fatalf("down failed: %v", err)
-	}
-	if got, want := pcol.snapshot().ElementCount, int64(b.N); got != want {
-		b.Errorf("did not process all elements: got %v, want %v", got, want)
-	}
-	if got, want := len(out.Elements), b.N; got != want {
-		b.Errorf("did not process all elements: got %v, want %v", got, want)
-	}
-}
-
-// BenchmarkPCollection_Baseline measures the baseline of the node benchmarking scaffold.
-//
-// On @lostluck's desktop (2020/02/20):
-// BenchmarkPCollection_Baseline-12        62186372                18.8 ns/op             0 B/op          0 allocs/op
-func BenchmarkPCollection_Baseline(b *testing.B) {
-	// Pre allocate the capture buffer and process buffer to avoid
-	// unnecessary overhead.
-	out := &CaptureNode{UID: 1, Elements: make([]FullValue, 0, b.N)}
-	process := make([]MainInput, 0, b.N)
-	for i := 0; i < b.N; i++ {
-		process = append(process, MainInput{Key: FullValue{
-			Windows:   window.SingleGlobalWindow,
-			Timestamp: mtime.ZeroTimestamp,
-			Elm:       1,
-		}})
-	}
-	n := &FixedRoot{UID: 3, Elements: process, Out: out}
-	p, err := NewPlan("a", []Unit{n, out})
-	if err != nil {
-		b.Fatalf("failed to construct plan: %v", err)
-	}
-	b.ResetTimer()
-	if err := p.Execute(context.Background(), "1", DataContext{}); err != nil {
-		b.Fatalf("execute failed: %v", err)
-	}
-	if err := p.Down(context.Background()); err != nil {
-		b.Fatalf("down failed: %v", err)
-	}
-	if got, want := len(out.Elements), b.N; got != want {
-		b.Errorf("did not process all elements: got %v, want %v", got, want)
-	}
-}
diff --git a/sdks/go/pkg/beam/core/runtime/exec/plan.go b/sdks/go/pkg/beam/core/runtime/exec/plan.go
index 5275092..fde9e7c 100644
--- a/sdks/go/pkg/beam/core/runtime/exec/plan.go
+++ b/sdks/go/pkg/beam/core/runtime/exec/plan.go
@@ -21,7 +21,9 @@ import (
 	"context"
 	"fmt"
 	"strings"
+	"sync"
 
+	"github.com/apache/beam/sdks/go/pkg/beam/core/metrics"
 	"github.com/apache/beam/sdks/go/pkg/beam/internal/errors"
 )
 
@@ -29,22 +31,34 @@ import (
 // from a part of a pipeline. A plan can be used to process multiple bundles
 // serially.
 type Plan struct {
-	id    string // id of the bundle descriptor for this plan
-	roots []Root
-	units []Unit
-	pcols []*PCollection
+	id       string
+	roots    []Root
+	units    []Unit
+	parDoIDs []string
 
 	status Status
 
+	// While the store is threadsafe, the reference to it
+	// is not, so we need to protect the store field to be
+	// able to asynchronously provide tentative metrics.
+	storeMu sync.Mutex
+	store   *metrics.Store
+
 	// TODO: there can be more than 1 DataSource in a bundle.
 	source *DataSource
 }
 
+// hasPID provides a common interface for extracting PTransformIDs
+// from Units.
+type hasPID interface {
+	GetPID() string
+}
+
 // NewPlan returns a new bundle execution plan from the given units.
 func NewPlan(id string, units []Unit) (*Plan, error) {
 	var roots []Root
-	var pcols []*PCollection
 	var source *DataSource
+	var pardoIDs []string
 
 	for _, u := range units {
 		if u == nil {
@@ -56,8 +70,8 @@ func NewPlan(id string, units []Unit) (*Plan, error) {
 		if s, ok := u.(*DataSource); ok {
 			source = s
 		}
-		if p, ok := u.(*PCollection); ok {
-			pcols = append(pcols, p)
+		if p, ok := u.(hasPID); ok {
+			pardoIDs = append(pardoIDs, p.GetPID())
 		}
 	}
 	if len(roots) == 0 {
@@ -65,12 +79,12 @@ func NewPlan(id string, units []Unit) (*Plan, error) {
 	}
 
 	return &Plan{
-		id:     id,
-		status: Initializing,
-		roots:  roots,
-		units:  units,
-		pcols:  pcols,
-		source: source,
+		id:       id,
+		status:   Initializing,
+		roots:    roots,
+		units:    units,
+		parDoIDs: pardoIDs,
+		source:   source,
 	}, nil
 }
 
@@ -88,6 +102,10 @@ func (p *Plan) SourcePTransformID() string {
 // are brought up on the first execution. If a bundle fails, the plan cannot
 // be reused for further bundles. Does not panic. Blocking.
 func (p *Plan) Execute(ctx context.Context, id string, manager DataContext) error {
+	ctx = metrics.SetBundleID(ctx, p.id)
+	p.storeMu.Lock()
+	p.store = metrics.GetStore(ctx)
+	p.storeMu.Unlock()
 	if p.status == Initializing {
 		for _, u := range p.units {
 			if err := callNoPanic(ctx, u.Up); err != nil {
@@ -160,28 +178,19 @@ func (p *Plan) String() string {
 	return fmt.Sprintf("Plan[%v]:\n%v", p.ID(), strings.Join(units, "\n"))
 }
 
-// PlanSnapshot contains system metrics for the current run of the plan.
-type PlanSnapshot struct {
-	Source ProgressReportSnapshot
-	PCols  []PCollectionSnapshot
-}
-
-// Progress returns a snapshot of progress of the plan, and associated metrics. 
-// The retuend boolean indicates whether the plan includes a DataSource, which is
-// important for handling legacy metrics. This boolean will be removed once
-// we no longer return legacy metrics.
-func (p *Plan) Progress() (PlanSnapshot, bool) {
-	pcolSnaps := make([]PCollectionSnapshot, 0, len(p.pcols)+1) // include space for the datasource pcollection.
-	for _, pcol := range p.pcols {
-		pcolSnaps = append(pcolSnaps, pcol.snapshot())
-	}
-	snap := PlanSnapshot{PCols: pcolSnaps}
+// Progress returns a snapshot of input progress of the plan, and associated metrics.
+func (p *Plan) Progress() (ProgressReportSnapshot, bool) {
 	if p.source != nil {
-		snap.Source = p.source.Progress()
-		snap.PCols = append(pcolSnaps, snap.Source.pcol)
-		return snap, true
+		return p.source.Progress(), true
 	}
-	return snap, false
+	return ProgressReportSnapshot{}, false
+}
+
+// Store returns the metric store for the last use of this plan.
+func (p *Plan) Store() *metrics.Store {
+	p.storeMu.Lock()
+	defer p.storeMu.Unlock()
+	return p.store
 }
 
 // SplitPoints captures the split requested by the Runner.
diff --git a/sdks/go/pkg/beam/core/runtime/exec/translate.go b/sdks/go/pkg/beam/core/runtime/exec/translate.go
index b629560..486dacf 100644
--- a/sdks/go/pkg/beam/core/runtime/exec/translate.go
+++ b/sdks/go/pkg/beam/core/runtime/exec/translate.go
@@ -17,7 +17,6 @@ package exec
 
 import (
 	"fmt"
-	"math/rand"
 	"strconv"
 	"strings"
 
@@ -78,16 +77,12 @@ func UnmarshalPlan(desc *fnpb.ProcessBundleDescriptor) (*Plan, error) {
 		for key, pid := range transform.GetOutputs() {
 			u.SID = StreamID{PtransformID: id, Port: port}
 			u.Name = key
+			u.outputPID = pid
 
 			u.Out, err = b.makePCollection(pid)
 			if err != nil {
 				return nil, err
 			}
-			// Elide the PCollection Node for DataSources.
-			// DataSources can get byte samples directly, and can handle CoGBKs.
-			u.PCol = *u.Out.(*PCollection)
-			u.Out = u.PCol.Out
-			b.units = b.units[:len(b.units)-1]
 		}
 
 		b.units = append(b.units, u)
@@ -103,8 +98,8 @@ type builder struct {
 	succ map[string][]linkID // PCollectionID -> []linkID
 
 	windowing map[string]*window.WindowingStrategy
-	nodes     map[string]*PCollection // PCollectionID -> Node (cache)
-	links     map[linkID]Node         // linkID -> Node (cache)
+	nodes     map[string]Node // PCollectionID -> Node (cache)
+	links     map[linkID]Node // linkID -> Node (cache)
 
 	units []Unit // result
 	idgen *GenID
@@ -146,7 +141,7 @@ func newBuilder(desc *fnpb.ProcessBundleDescriptor) (*builder, error) {
 		succ: succ,
 
 		windowing: make(map[string]*window.WindowingStrategy),
-		nodes:     make(map[string]*PCollection),
+		nodes:     make(map[string]Node),
 		links:     make(map[linkID]Node),
 
 		idgen: &GenID{},
@@ -263,7 +258,7 @@ func (b *builder) makeCoderForPCollection(id string) (*coder.Coder, *coder.Windo
 	return c, wc, nil
 }
 
-func (b *builder) makePCollection(id string) (*PCollection, error) {
+func (b *builder) makePCollection(id string) (Node, error) {
 	if n, exists := b.nodes[id]; exists {
 		return n, nil
 	}
@@ -278,11 +273,8 @@ func (b *builder) makePCollection(id string) (*PCollection, error) {
 		u = &Discard{UID: b.idgen.New()}
 
 	case 1:
-		out, err := b.makeLink(id, list[0])
-		if err != nil {
-			return nil, err
-		}
-		return b.newPCollectionNode(id, out)
+		return b.makeLink(id, list[0])
+
 	default:
 		// Multiplex.
 
@@ -299,16 +291,7 @@ func (b *builder) makePCollection(id string) (*PCollection, error) {
 		b.units = append(b.units, u)
 		u = &Flatten{UID: b.idgen.New(), N: count, Out: u}
 	}
-	b.units = append(b.units, u)
-	return b.newPCollectionNode(id, u)
-}
 
-func (b *builder) newPCollectionNode(id string, out Node) (*PCollection, error) {
-	ec, _, err := b.makeCoderForPCollection(id)
-	if err != nil {
-		return nil, err
-	}
-	u := &PCollection{UID: b.idgen.New(), Out: out, PColID: id, Coder: ec, Seed: rand.Int63()}
 	b.nodes[id] = u
 	b.units = append(b.units, u)
 	return u, nil
diff --git a/sdks/go/pkg/beam/core/runtime/harness/harness.go b/sdks/go/pkg/beam/core/runtime/harness/harness.go
index 2ee8beb..c2fce51 100644
--- a/sdks/go/pkg/beam/core/runtime/harness/harness.go
+++ b/sdks/go/pkg/beam/core/runtime/harness/harness.go
@@ -23,7 +23,6 @@ import (
 	"sync"
 	"time"
 
-	"github.com/apache/beam/sdks/go/pkg/beam/core/metrics"
 	"github.com/apache/beam/sdks/go/pkg/beam/core/runtime/exec"
 	"github.com/apache/beam/sdks/go/pkg/beam/core/util/hooks"
 	"github.com/apache/beam/sdks/go/pkg/beam/internal/errors"
@@ -83,12 +82,11 @@ func Main(ctx context.Context, loggingEndpoint, controlEndpoint string) error {
 	}()
 
 	ctrl := &control{
-		plans:    make(map[bundleDescriptorID]*exec.Plan),
-		active:   make(map[instructionID]*exec.Plan),
-		metStore: make(map[instructionID]*metrics.Store),
-		failed:   make(map[instructionID]error),
-		data:     &DataChannelManager{},
-		state:    &StateChannelManager{},
+		plans:  make(map[bundleDescriptorID]*exec.Plan),
+		active: make(map[instructionID]*exec.Plan),
+		failed: make(map[instructionID]error),
+		data:   &DataChannelManager{},
+		state:  &StateChannelManager{},
 	}
 
 	// gRPC requires all readers of a stream be the same goroutine, so this goroutine
@@ -144,8 +142,6 @@ type control struct {
 	// plans that are actively being executed.
 	// a plan can only be in one of these maps at any time.
 	active map[instructionID]*exec.Plan // protected by mu
-	// metric stores for active plans.
-	metStore map[instructionID]*metrics.Store // protected by mu
 	// plans that have failed during execution
 	failed map[instructionID]error // protected by mu
 	mu     sync.Mutex
@@ -196,10 +192,6 @@ func (c *control) handleInstruction(ctx context.Context, req *fnpb.InstructionRe
 		// since a plan can't be run concurrently.
 		c.active[instID] = plan
 		delete(c.plans, bdID)
-		// Get the user metrics store for this bundle.
-		ctx = metrics.SetBundleID(ctx, string(instID))
-		store := metrics.GetStore(ctx)
-		c.metStore[instID] = store
 		c.mu.Unlock()
 
 		if !ok {
@@ -212,7 +204,7 @@ func (c *control) handleInstruction(ctx context.Context, req *fnpb.InstructionRe
 		data.Close()
 		state.Close()
 
-		mets, mons := monitoring(plan, store)
+		mets, mons := monitoring(plan)
 		// Move the plan back to the candidate state
 		c.mu.Lock()
 		// Mark the instruction as failed.
@@ -221,7 +213,6 @@ func (c *control) handleInstruction(ctx context.Context, req *fnpb.InstructionRe
 		}
 		c.plans[bdID] = plan
 		delete(c.active, instID)
-		delete(c.metStore, instID)
 		c.mu.Unlock()
 
 		if err != nil {
@@ -245,7 +236,6 @@ func (c *control) handleInstruction(ctx context.Context, req *fnpb.InstructionRe
 		ref := instructionID(msg.GetInstructionId())
 		c.mu.Lock()
 		plan, ok := c.active[ref]
-		store, _ := c.metStore[ref]
 		err := c.failed[ref]
 		c.mu.Unlock()
 		if err != nil {
@@ -255,7 +245,7 @@ func (c *control) handleInstruction(ctx context.Context, req *fnpb.InstructionRe
 			return fail(ctx, instID, "failed to return progress: instruction %v not active", ref)
 		}
 
-		mets, mons := monitoring(plan, store)
+		mets, mons := monitoring(plan)
 
 		return &fnpb.InstructionResponse{
 			InstructionId: string(instID),
diff --git a/sdks/go/pkg/beam/core/runtime/harness/monitoring.go b/sdks/go/pkg/beam/core/runtime/harness/monitoring.go
index b2bc35d..14162dd 100644
--- a/sdks/go/pkg/beam/core/runtime/harness/monitoring.go
+++ b/sdks/go/pkg/beam/core/runtime/harness/monitoring.go
@@ -25,7 +25,11 @@ import (
 	"github.com/golang/protobuf/ptypes"
 )
 
-func monitoring(p *exec.Plan, store *metrics.Store) (*fnpb.Metrics, []*ppb.MonitoringInfo) {
+func monitoring(p *exec.Plan) (*fnpb.Metrics, []*ppb.MonitoringInfo) {
+	store := p.Store()
+	if store == nil {
+		return nil, nil
+	}
 	// Get the legacy style metrics.
 	transforms := make(map[string]*fnpb.Metrics_PTransform)
 	metrics.Extractor{
@@ -110,44 +114,27 @@ func monitoring(p *exec.Plan, store *metrics.Store) (*fnpb.Metrics, []*ppb.Monit
 	}.ExtractFrom(store)
 
 	// Get the execution monitoring information from the bundle plan.
-	snapshot, hasSource := p.Progress()
-	if hasSource {
+	if snapshot, ok := p.Progress(); ok {
 		// Legacy version.
-		transforms[snapshot.Source.ID] = &fnpb.Metrics_PTransform{
+		transforms[snapshot.ID] = &fnpb.Metrics_PTransform{
 			ProcessedElements: &fnpb.Metrics_PTransform_ProcessedElements{
 				Measured: &fnpb.Metrics_PTransform_Measured{
 					OutputElementCounts: map[string]int64{
-						snapshot.Source.Name: snapshot.Source.Count,
+						snapshot.Name: snapshot.Count,
 					},
 				},
 			},
 		}
-	}
-
-	// Monitoring info version.
-	for _, pcol := range snapshot.PCols {
+		// Monitoring info version.
 		monitoringInfo = append(monitoringInfo,
 			&ppb.MonitoringInfo{
 				Urn:  "beam:metric:element_count:v1",
 				Type: "beam:metrics:sum_int_64",
 				Labels: map[string]string{
-					"PCOLLECTION": pcol.ID,
+					"PCOLLECTION": snapshot.PID,
 				},
-				Data: int64Counter(pcol.ElementCount),
+				Data: int64Counter(snapshot.Count),
 			})
-
-		// Skip pcollections without size
-		if pcol.SizeCount != 0 {
-			monitoringInfo = append(monitoringInfo,
-				&ppb.MonitoringInfo{
-					Urn:  "beam:metric:sampled_byte_size:v1",
-					Type: "beam:metrics:distribution_int_64",
-					Labels: map[string]string{
-						"PCOLLECTION": pcol.ID,
-					},
-					Data: int64Distribution(pcol.SizeCount, pcol.SizeSum, pcol.SizeMin, pcol.SizeMax),
-				})
-		}
 	}
 
 	return &fnpb.Metrics{
diff --git a/sdks/go/pkg/beam/runners/direct/direct.go b/sdks/go/pkg/beam/runners/direct/direct.go
index 4d78dc8..eface25 100644
--- a/sdks/go/pkg/beam/runners/direct/direct.go
+++ b/sdks/go/pkg/beam/runners/direct/direct.go
@@ -46,7 +46,6 @@ func Execute(ctx context.Context, p *beam.Pipeline) error {
 
 	log.Info(ctx, "Pipeline:")
 	log.Info(ctx, p)
-	ctx = metrics.SetBundleID(ctx, "direct") // Ensure a metrics.Store exists.
 
 	if *jobopts.Strict {
 		log.Info(ctx, "Strict mode enabled, applying additional validation.")
@@ -75,7 +74,7 @@ func Execute(ctx context.Context, p *beam.Pipeline) error {
 	}
 	// TODO(lostluck) 2020/01/24: What's the right way to expose the
 	// metrics store for the direct runner?
-	metrics.DumpToLog(ctx)
+	metrics.DumpToLogFromStore(ctx, plan.Store())
 	return nil
 }