You are viewing a plain text version of this content. The canonical link for it is here.
Posted to github@arrow.apache.org by GitBox <gi...@apache.org> on 2022/05/11 16:16:38 UTC

[GitHub] [arrow] raceordie690 opened a new pull request, #13120: Added concurrency in key places that are always serial, regardless if parallel=true or not

raceordie690 opened a new pull request, #13120:
URL: https://github.com/apache/arrow/pull/13120

   added concurrency to field readers.  Even when parallel=true, there a…re times when default behavior is serial which causes very slow performance when dealing with many columns and structures with many columns.
   
   I'm working with very complex parquet files that have  500+ columns and lists of structures with 100's of columns. In the original code, getting the field readers is always done serially regardless if parallel is true.  This is also true when the readers retrieve 'next batch' of records.  I modified the code to perform concurrent 'read' operations in three places in two files.  The performance impact is especially heavy on high-latency files, e.g., cloud storage.
   
   The original version required just over an hour to read 600+ columns from GCS.  The revised version completes the same read in ~ 11 minutes.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow] zeroshade closed pull request #13120: ARROW-16530: [Go] Added concurrency in key places that are always serial, regardless if parallel=true or not

Posted by GitBox <gi...@apache.org>.
zeroshade closed pull request #13120: ARROW-16530: [Go] Added concurrency in key places that are always serial, regardless if parallel=true or not
URL: https://github.com/apache/arrow/pull/13120


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow] raceordie690 commented on pull request #13120: ARROW-16530: [Go] Added concurrency in key places that are always serial, regardless if parallel=true or not

Posted by GitBox <gi...@apache.org>.
raceordie690 commented on PR #13120:
URL: https://github.com/apache/arrow/pull/13120#issuecomment-1126737153

   What's the official position on generics?  Too soon?  
   
   I'm making the requested changes this A.M.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow] zeroshade commented on pull request #13120: ARROW-16530: [Go] Added concurrency in key places that are always serial, regardless if parallel=true or not

Posted by GitBox <gi...@apache.org>.
zeroshade commented on PR #13120:
URL: https://github.com/apache/arrow/pull/13120#issuecomment-1126786108

   @raceordie690 if you rebase to pull in the commit I just merged, it should alleviate the flakey test


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow] zeroshade commented on a diff in pull request #13120: ARROW-16530: [Go] Added concurrency in key places that are always serial, regardless if parallel=true or not

Posted by GitBox <gi...@apache.org>.
zeroshade commented on code in PR #13120:
URL: https://github.com/apache/arrow/pull/13120#discussion_r871664930


##########
go/parquet/pqarrow/file_reader.go:
##########
@@ -202,14 +211,35 @@ func (fr *FileReader) GetFieldReaders(ctx context.Context, colIndices, rowGroups
 
 	out := make([]*ColumnReader, len(fieldIndices))
 	outFields := make([]arrow.Field, len(fieldIndices))
-	for idx, fidx := range fieldIndices {
-		rdr, err := fr.GetFieldReader(ctx, fidx, includedLeaves, rowGroups)
-		if err != nil {
-			return nil, nil, err
+
+	//* Read First error from errchan and break only capturing first error
+	go func() {
+		for err = range errchan {
+			break
 		}
+	}()
 
-		outFields[idx] = *rdr.Field()
-		out[idx] = rdr
+	// GetFieldReader causes read operations, when issued serially on large numbers of columns,
+	// this is super time consuming. Get field readers concurrently.
+	wg.Add(len(fieldIndices))

Review Comment:
   Same as before, we should respect any properties dictating limitations on the parallelizing instead of automatically doing it for *all* field indices.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow] zeroshade commented on a diff in pull request #13120: ARROW-16530: [Go] Added concurrency in key places that are always serial, regardless if parallel=true or not

Posted by GitBox <gi...@apache.org>.
zeroshade commented on code in PR #13120:
URL: https://github.com/apache/arrow/pull/13120#discussion_r871664164


##########
go/parquet/pqarrow/column_readers.go:
##########
@@ -216,12 +217,34 @@ func (sr *structReader) GetRepLevels() ([]int16, error) {
 }
 
 func (sr *structReader) LoadBatch(nrecords int64) error {
-	for _, rdr := range sr.children {
-		if err := rdr.LoadBatch(nrecords); err != nil {
-			return err
+	var (
+		// REP -- Load batches in parallel
+		// When reading structs with large numbers of columns, the serial load is very slow.
+		// This is especially true when reading Cloud Storage. Loading concurrently
+		// greatly improves performance.
+		wg      sync.WaitGroup
+		errchan chan error = make(chan error)
+		err     error
+	)
+
+	//* Read First error from errchan and break only capturing first error
+	go func() {
+		for err = range errchan {
+			break
 		}
+	}()
+	wg.Add(len(sr.children))
+	for _, rdr := range sr.children {
+		go func(r *ColumnReader) {
+			defer wg.Done()
+			if err := r.LoadBatch(nrecords); err != nil {
+				errchan <- err
+			}
+		}(rdr)
 	}
-	return nil
+	wg.Wait() // wait for reads to complete
+	close(errchan)

Review Comment:
   Might be worthwhile to create a helper for this pattern rather than repeat the pattern over and over again. 
   
   We should probably also respect the read properties dictating any limitation on the amount of parallelizing



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow] ursabot commented on pull request #13120: ARROW-16530: [Go] Added concurrency in key places that are always serial, regardless if parallel=true or not

Posted by GitBox <gi...@apache.org>.
ursabot commented on PR #13120:
URL: https://github.com/apache/arrow/pull/13120#issuecomment-1129181684

   Benchmark runs are scheduled for baseline = c032290b9ea2699ce29f4fa26e6826911e13fcca and contender = 5f56450f5acee2bd882a5c7c58fa5ac64d6a5acb. 5f56450f5acee2bd882a5c7c58fa5ac64d6a5acb is a master commit associated with this PR. Results will be available as each benchmark for each run completes.
   Conbench compare runs links:
   [Finished :arrow_down:0.0% :arrow_up:0.0%] [ec2-t3-xlarge-us-east-2](https://conbench.ursa.dev/compare/runs/fa08991e35424cb7a6e21757bcafcd10...84de99c9b3784882b375238fc8c9f17f/)
   [Finished :arrow_down:1.13% :arrow_up:0.0%] [test-mac-arm](https://conbench.ursa.dev/compare/runs/4f95789754a24c6e992f12429f13049a...f6a22d8d6682486daa8593532fbee1f6/)
   [Finished :arrow_down:0.36% :arrow_up:0.0%] [ursa-i9-9960x](https://conbench.ursa.dev/compare/runs/d7dd40c9646f43f2a852a9b44fa1e345...a6b2b7b3fba5458d99e0d40d70e7e0f8/)
   [Finished :arrow_down:0.04% :arrow_up:0.08%] [ursa-thinkcentre-m75q](https://conbench.ursa.dev/compare/runs/365143f837834c96ad5a985dd4fc477c...e9c5c33602d548b6a798d2beb489424c/)
   Buildkite builds:
   [Finished] [`5f56450f` ec2-t3-xlarge-us-east-2](https://buildkite.com/apache-arrow/arrow-bci-benchmark-on-ec2-t3-xlarge-us-east-2/builds/775)
   [Finished] [`5f56450f` test-mac-arm](https://buildkite.com/apache-arrow/arrow-bci-benchmark-on-test-mac-arm/builds/772)
   [Finished] [`5f56450f` ursa-i9-9960x](https://buildkite.com/apache-arrow/arrow-bci-benchmark-on-ursa-i9-9960x/builds/762)
   [Finished] [`5f56450f` ursa-thinkcentre-m75q](https://buildkite.com/apache-arrow/arrow-bci-benchmark-on-ursa-thinkcentre-m75q/builds/778)
   [Finished] [`c032290b` ec2-t3-xlarge-us-east-2](https://buildkite.com/apache-arrow/arrow-bci-benchmark-on-ec2-t3-xlarge-us-east-2/builds/774)
   [Finished] [`c032290b` test-mac-arm](https://buildkite.com/apache-arrow/arrow-bci-benchmark-on-test-mac-arm/builds/771)
   [Finished] [`c032290b` ursa-i9-9960x](https://buildkite.com/apache-arrow/arrow-bci-benchmark-on-ursa-i9-9960x/builds/761)
   [Finished] [`c032290b` ursa-thinkcentre-m75q](https://buildkite.com/apache-arrow/arrow-bci-benchmark-on-ursa-thinkcentre-m75q/builds/777)
   Supported benchmarks:
   ec2-t3-xlarge-us-east-2: Supported benchmark langs: Python, R. Runs only benchmarks with cloud = True
   test-mac-arm: Supported benchmark langs: C++, Python, R
   ursa-i9-9960x: Supported benchmark langs: Python, R, JavaScript
   ursa-thinkcentre-m75q: Supported benchmark langs: C++, Java
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow] github-actions[bot] commented on pull request #13120: Added concurrency in key places that are always serial, regardless if parallel=true or not

Posted by GitBox <gi...@apache.org>.
github-actions[bot] commented on PR #13120:
URL: https://github.com/apache/arrow/pull/13120#issuecomment-1123982147

   <!--
     Licensed to the Apache Software Foundation (ASF) under one
     or more contributor license agreements.  See the NOTICE file
     distributed with this work for additional information
     regarding copyright ownership.  The ASF licenses this file
     to you under the Apache License, Version 2.0 (the
     "License"); you may not use this file except in compliance
     with the License.  You may obtain a copy of the License at
   
       http://www.apache.org/licenses/LICENSE-2.0
   
     Unless required by applicable law or agreed to in writing,
     software distributed under the License is distributed on an
     "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
     KIND, either express or implied.  See the License for the
     specific language governing permissions and limitations
     under the License.
   -->
   
   Thanks for opening a pull request!
   
   If this is not a [minor PR](https://github.com/apache/arrow/blob/master/CONTRIBUTING.md#Minor-Fixes). Could you open an issue for this pull request on JIRA? https://issues.apache.org/jira/browse/ARROW
   
   Opening JIRAs ahead of time contributes to the [Openness](http://theapacheway.com/open/#:~:text=Openness%20allows%20new%20users%20the,must%20happen%20in%20the%20open.) of the Apache Arrow project.
   
   Then could you also rename pull request title in the following format?
   
       ARROW-${JIRA_ID}: [${COMPONENT}] ${SUMMARY}
   
   or
   
       MINOR: [${COMPONENT}] ${SUMMARY}
   
   See also:
   
     * [Other pull requests](https://github.com/apache/arrow/pulls/)
     * [Contribution Guidelines - How to contribute patches](https://arrow.apache.org/docs/developers/contributing.html#how-to-contribute-patches)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow] zeroshade commented on pull request #13120: ARROW-16530: [Go] Added concurrency in key places that are always serial, regardless if parallel=true or not

Posted by GitBox <gi...@apache.org>.
zeroshade commented on PR #13120:
URL: https://github.com/apache/arrow/pull/13120#issuecomment-1127714980

   @raceordie690 Looks great, once all the tests pass I'll merge it in. Thanks so much!


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow] raceordie690 commented on a diff in pull request #13120: ARROW-16530: [Go] Added concurrency in key places that are always serial, regardless if parallel=true or not

Posted by GitBox <gi...@apache.org>.
raceordie690 commented on code in PR #13120:
URL: https://github.com/apache/arrow/pull/13120#discussion_r871798359


##########
go/parquet/pqarrow/column_readers.go:
##########
@@ -216,12 +217,34 @@ func (sr *structReader) GetRepLevels() ([]int16, error) {
 }
 
 func (sr *structReader) LoadBatch(nrecords int64) error {
-	for _, rdr := range sr.children {
-		if err := rdr.LoadBatch(nrecords); err != nil {
-			return err
+	var (
+		// REP -- Load batches in parallel

Review Comment:
   Sorry, it's my initials.  I use it to mark places I'm working.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow] zeroshade commented on a diff in pull request #13120: ARROW-16530: [Go] Added concurrency in key places that are always serial, regardless if parallel=true or not

Posted by GitBox <gi...@apache.org>.
zeroshade commented on code in PR #13120:
URL: https://github.com/apache/arrow/pull/13120#discussion_r871661503


##########
go/parquet/pqarrow/column_readers.go:
##########
@@ -216,12 +217,34 @@ func (sr *structReader) GetRepLevels() ([]int16, error) {
 }
 
 func (sr *structReader) LoadBatch(nrecords int64) error {
-	for _, rdr := range sr.children {
-		if err := rdr.LoadBatch(nrecords); err != nil {
-			return err
+	var (
+		// REP -- Load batches in parallel

Review Comment:
   what is `REP`?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow] github-actions[bot] commented on pull request #13120: ARROW-16530: go Added concurrency in key places that are always serial, regardless if parallel=true or not

Posted by GitBox <gi...@apache.org>.
github-actions[bot] commented on PR #13120:
URL: https://github.com/apache/arrow/pull/13120#issuecomment-1124012958

   https://issues.apache.org/jira/browse/ARROW-16530


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow] zeroshade commented on a diff in pull request #13120: ARROW-16530: [Go] Added concurrency in key places that are always serial, regardless if parallel=true or not

Posted by GitBox <gi...@apache.org>.
zeroshade commented on code in PR #13120:
URL: https://github.com/apache/arrow/pull/13120#discussion_r873052505


##########
go/parquet/pqarrow/column_readers.go:
##########
@@ -218,32 +222,38 @@ func (sr *structReader) GetRepLevels() ([]int16, error) {
 
 func (sr *structReader) LoadBatch(nrecords int64) error {
 	var (
-		// REP -- Load batches in parallel
+		// Load batches in parallel
 		// When reading structs with large numbers of columns, the serial load is very slow.
 		// This is especially true when reading Cloud Storage. Loading concurrently
 		// greatly improves performance.
-		wg      sync.WaitGroup
-		errchan chan error = make(chan error)
-		err     error
+		wg   sync.WaitGroup
+		np   int = 1 // default to serial
+		sem  chan interface{}
+		errs ErrBuffer
+		err  error
 	)
 
-	//* Read First error from errchan and break only capturing first error
-	go func() {
-		for err = range errchan {
-			break
-		}
-	}()
+	if sr.props.Parallel {
+		np = len(sr.children)
+	}
+	sem = make(chan interface{}, np)
 	wg.Add(len(sr.children))
 	for _, rdr := range sr.children {
+		sem <- nil // Acquire
 		go func(r *ColumnReader) {
 			defer wg.Done()
+			defer func() { <-sem }() // release
 			if err := r.LoadBatch(nrecords); err != nil {
-				errchan <- err
+				errs.Append(err)
 			}
 		}(rdr)
 	}
 	wg.Wait() // wait for reads to complete
-	close(errchan)

Review Comment:
   same comment applies to the other spots too.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow] raceordie690 commented on pull request #13120: ARROW-16530: [Go] Added concurrency in key places that are always serial, regardless if parallel=true or not

Posted by GitBox <gi...@apache.org>.
raceordie690 commented on PR #13120:
URL: https://github.com/apache/arrow/pull/13120#issuecomment-1126773856

   > @raceordie690 For now we are not adopting the generics.
   > 
   > The standard recommendation is to support latest version - 1, so we want to maintain support for Go1.17 for now. When Go 1.19 is released, we can bump to Go1.18 and begin to use Generics.
   
   Yeah, I knew when I wrote my comment!  The go.mod specifies Go 1.15. It's very minor.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow] raceordie690 commented on a diff in pull request #13120: ARROW-16530: [Go] Added concurrency in key places that are always serial, regardless if parallel=true or not

Posted by GitBox <gi...@apache.org>.
raceordie690 commented on code in PR #13120:
URL: https://github.com/apache/arrow/pull/13120#discussion_r873056518


##########
go/parquet/pqarrow/column_readers.go:
##########
@@ -218,32 +222,38 @@ func (sr *structReader) GetRepLevels() ([]int16, error) {
 
 func (sr *structReader) LoadBatch(nrecords int64) error {
 	var (
-		// REP -- Load batches in parallel
+		// Load batches in parallel
 		// When reading structs with large numbers of columns, the serial load is very slow.
 		// This is especially true when reading Cloud Storage. Loading concurrently
 		// greatly improves performance.
-		wg      sync.WaitGroup
-		errchan chan error = make(chan error)
-		err     error
+		wg   sync.WaitGroup
+		np   int = 1 // default to serial
+		sem  chan interface{}
+		errs ErrBuffer
+		err  error
 	)
 
-	//* Read First error from errchan and break only capturing first error
-	go func() {
-		for err = range errchan {
-			break
-		}
-	}()
+	if sr.props.Parallel {
+		np = len(sr.children)
+	}
+	sem = make(chan interface{}, np)
 	wg.Add(len(sr.children))
 	for _, rdr := range sr.children {
+		sem <- nil // Acquire
 		go func(r *ColumnReader) {
 			defer wg.Done()
+			defer func() { <-sem }() // release
 			if err := r.LoadBatch(nrecords); err != nil {
-				errchan <- err
+				errs.Append(err)
 			}
 		}(rdr)
 	}
 	wg.Wait() // wait for reads to complete
-	close(errchan)

Review Comment:
   That's much better. Thx!  I write tons of concurrency code and never came across that.  Though I do use the semaphore code from x/sync often.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow] zeroshade commented on a diff in pull request #13120: ARROW-16530: [Go] Added concurrency in key places that are always serial, regardless if parallel=true or not

Posted by GitBox <gi...@apache.org>.
zeroshade commented on code in PR #13120:
URL: https://github.com/apache/arrow/pull/13120#discussion_r873058055


##########
go/parquet/pqarrow/column_readers.go:
##########
@@ -216,12 +221,28 @@ func (sr *structReader) GetRepLevels() ([]int16, error) {
 }
 
 func (sr *structReader) LoadBatch(nrecords int64) error {
+	var (
+		np int = 1 // default to serial
+	)
+	// Load batches in parallel
+	// When reading structs with large numbers of columns, the serial load is very slow.
+	// This is especially true when reading Cloud Storage. Loading concurrently
+	// greatly improves performance.
+	if sr.props.Parallel {
+		np = len(sr.children)
+	}
+	g := new(errgroup.Group)
+	g.SetLimit(np)
 	for _, rdr := range sr.children {
-		if err := rdr.LoadBatch(nrecords); err != nil {
-			return err
-		}
+		func(r *ColumnReader) {
+			g.Go(func() error {
+				err := r.LoadBatch(nrecords)
+				return err
+			})
+		}(rdr)

Review Comment:
   you don't need the wrapping function, you can just do:
   `r := rdr` before the call to `g.Go` to have the same effect by creating an iteration-local variable. It's cleaner and cheaper than the enclosing function.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow] zeroshade commented on a diff in pull request #13120: ARROW-16530: [Go] Added concurrency in key places that are always serial, regardless if parallel=true or not

Posted by GitBox <gi...@apache.org>.
zeroshade commented on code in PR #13120:
URL: https://github.com/apache/arrow/pull/13120#discussion_r873052453


##########
go/parquet/pqarrow/column_readers.go:
##########
@@ -218,32 +222,38 @@ func (sr *structReader) GetRepLevels() ([]int16, error) {
 
 func (sr *structReader) LoadBatch(nrecords int64) error {
 	var (
-		// REP -- Load batches in parallel
+		// Load batches in parallel
 		// When reading structs with large numbers of columns, the serial load is very slow.
 		// This is especially true when reading Cloud Storage. Loading concurrently
 		// greatly improves performance.
-		wg      sync.WaitGroup
-		errchan chan error = make(chan error)
-		err     error
+		wg   sync.WaitGroup
+		np   int = 1 // default to serial
+		sem  chan interface{}
+		errs ErrBuffer
+		err  error
 	)
 
-	//* Read First error from errchan and break only capturing first error
-	go func() {
-		for err = range errchan {
-			break
-		}
-	}()
+	if sr.props.Parallel {
+		np = len(sr.children)
+	}
+	sem = make(chan interface{}, np)
 	wg.Add(len(sr.children))
 	for _, rdr := range sr.children {
+		sem <- nil // Acquire
 		go func(r *ColumnReader) {
 			defer wg.Done()
+			defer func() { <-sem }() // release
 			if err := r.LoadBatch(nrecords); err != nil {
-				errchan <- err
+				errs.Append(err)
 			}
 		}(rdr)
 	}
 	wg.Wait() // wait for reads to complete
-	close(errchan)

Review Comment:
   Interesting approach using a channel as a semaphore. 
   
   I think we should try using `golang.org/x/sync/errgroup`. Using that would simplify the code quite a bit:
   
   ```go
   g := new(errgroup.Group)
   g.SetLimit(np)
   for _, rdr := range sr.children {
           rdr := rdr
           g.Go(func() error { return rdr.LoadBatch(nrecords) })
   }
   
   return g.Wait()
   ```
   
   I believe this would work correctly. Thoughts?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow] raceordie690 commented on a diff in pull request #13120: ARROW-16530: [Go] Added concurrency in key places that are always serial, regardless if parallel=true or not

Posted by GitBox <gi...@apache.org>.
raceordie690 commented on code in PR #13120:
URL: https://github.com/apache/arrow/pull/13120#discussion_r873052929


##########
go/parquet/pqarrow/column_readers.go:
##########
@@ -218,32 +222,38 @@ func (sr *structReader) GetRepLevels() ([]int16, error) {
 
 func (sr *structReader) LoadBatch(nrecords int64) error {
 	var (
-		// REP -- Load batches in parallel
+		// Load batches in parallel
 		// When reading structs with large numbers of columns, the serial load is very slow.
 		// This is especially true when reading Cloud Storage. Loading concurrently
 		// greatly improves performance.
-		wg      sync.WaitGroup
-		errchan chan error = make(chan error)
-		err     error
+		wg   sync.WaitGroup
+		np   int = 1 // default to serial
+		sem  chan interface{}
+		errs ErrBuffer
+		err  error
 	)
 
-	//* Read First error from errchan and break only capturing first error
-	go func() {
-		for err = range errchan {
-			break
-		}
-	}()
+	if sr.props.Parallel {
+		np = len(sr.children)
+	}
+	sem = make(chan interface{}, np)
 	wg.Add(len(sr.children))
 	for _, rdr := range sr.children {
+		sem <- nil // Acquire
 		go func(r *ColumnReader) {
 			defer wg.Done()
+			defer func() { <-sem }() // release
 			if err := r.LoadBatch(nrecords); err != nil {
-				errchan <- err
+				errs.Append(err)
 			}
 		}(rdr)
 	}
 	wg.Wait() // wait for reads to complete
-	close(errchan)

Review Comment:
   I like that, a lot!  Never seen it.  I'll revise.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow] github-actions[bot] commented on pull request #13120: ARROW-16530: go Added concurrency in key places that are always serial, regardless if parallel=true or not

Posted by GitBox <gi...@apache.org>.
github-actions[bot] commented on PR #13120:
URL: https://github.com/apache/arrow/pull/13120#issuecomment-1124012975

   :warning: Ticket **has not been started in JIRA**, please click 'Start Progress'.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow] raceordie690 commented on pull request #13120: Added concurrency in key places that are always serial, regardless if parallel=true or not

Posted by GitBox <gi...@apache.org>.
raceordie690 commented on PR #13120:
URL: https://github.com/apache/arrow/pull/13120#issuecomment-1123998092

   I have created a PR -->  https://issues.apache.org/jira/browse/ARROW-16530


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow] raceordie690 commented on a diff in pull request #13120: ARROW-16530: [Go] Added concurrency in key places that are always serial, regardless if parallel=true or not

Posted by GitBox <gi...@apache.org>.
raceordie690 commented on code in PR #13120:
URL: https://github.com/apache/arrow/pull/13120#discussion_r873051130


##########
go/parquet/pqarrow/column_readers.go:
##########
@@ -216,12 +217,34 @@ func (sr *structReader) GetRepLevels() ([]int16, error) {
 }
 
 func (sr *structReader) LoadBatch(nrecords int64) error {
-	for _, rdr := range sr.children {
-		if err := rdr.LoadBatch(nrecords); err != nil {
-			return err
+	var (
+		// REP -- Load batches in parallel
+		// When reading structs with large numbers of columns, the serial load is very slow.
+		// This is especially true when reading Cloud Storage. Loading concurrently
+		// greatly improves performance.
+		wg      sync.WaitGroup
+		errchan chan error = make(chan error)
+		err     error
+	)
+
+	//* Read First error from errchan and break only capturing first error
+	go func() {
+		for err = range errchan {
+			break
 		}
+	}()
+	wg.Add(len(sr.children))
+	for _, rdr := range sr.children {
+		go func(r *ColumnReader) {
+			defer wg.Done()
+			if err := r.LoadBatch(nrecords); err != nil {
+				errchan <- err
+			}
+		}(rdr)
 	}
-	return nil
+	wg.Wait() // wait for reads to complete
+	close(errchan)

Review Comment:
   I looked at creating a helper, however there are complications.  The functionality of the different areas is such that it would make things complicated, whereas the pattern is pretty straight forward.   ReadRowGroups seems overly complicated to accomplish what needs to be done.  However, I'm reluctant to change something that works.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow] zeroshade commented on a diff in pull request #13120: ARROW-16530: [Go] Added concurrency in key places that are always serial, regardless if parallel=true or not

Posted by GitBox <gi...@apache.org>.
zeroshade commented on code in PR #13120:
URL: https://github.com/apache/arrow/pull/13120#discussion_r873058184


##########
go/parquet/pqarrow/file_reader.go:
##########
@@ -202,14 +206,33 @@ func (fr *FileReader) GetFieldReaders(ctx context.Context, colIndices, rowGroups
 
 	out := make([]*ColumnReader, len(fieldIndices))
 	outFields := make([]arrow.Field, len(fieldIndices))
-	for idx, fidx := range fieldIndices {
-		rdr, err := fr.GetFieldReader(ctx, fidx, includedLeaves, rowGroups)
-		if err != nil {
-			return nil, nil, err
-		}
 
-		outFields[idx] = *rdr.Field()
-		out[idx] = rdr
+	// Load batches in parallel
+	// When reading structs with large numbers of columns, the serial load is very slow.
+	// This is especially true when reading Cloud Storage. Loading concurrently
+	// greatly improves performance.
+	// GetFieldReader causes read operations, when issued serially on large numbers of columns,
+	// this is super time consuming. Get field readers concurrently.
+	if fr.Props.Parallel {
+		np = len(fieldIndices)
+	}
+	g := new(errgroup.Group)
+	g.SetLimit(np)
+	for idx, fidx := range fieldIndices {
+		func(idx, fidx int) {
+			g.Go(func() error {
+				rdr, err := fr.GetFieldReader(ctx, fidx, includedLeaves, rowGroups)
+				if err != nil {
+					return err
+				}
+				outFields[idx] = *rdr.Field()
+				out[idx] = rdr
+				return nil
+			})
+		}(idx, fidx)

Review Comment:
   same as above, rather than enclosing it with a function to achieve the safety, you can just do:
   `idx, fidx := idx, fidx` just before you call `g.Go` to create iteration-local copies of the variables. Personally i find it cleaner than enclosing the whole thing in a function.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow] zeroshade commented on a diff in pull request #13120: ARROW-16530: [Go] Added concurrency in key places that are always serial, regardless if parallel=true or not

Posted by GitBox <gi...@apache.org>.
zeroshade commented on code in PR #13120:
URL: https://github.com/apache/arrow/pull/13120#discussion_r871929742


##########
go/parquet/pqarrow/column_readers.go:
##########
@@ -216,12 +217,34 @@ func (sr *structReader) GetRepLevels() ([]int16, error) {
 }
 
 func (sr *structReader) LoadBatch(nrecords int64) error {
-	for _, rdr := range sr.children {
-		if err := rdr.LoadBatch(nrecords); err != nil {
-			return err
+	var (
+		// REP -- Load batches in parallel

Review Comment:
   That's what I figured. Given this codebase, I'd leave the initials out please. But the comment is otherwise good. 😄 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow] zeroshade commented on pull request #13120: ARROW-16530: [Go] Added concurrency in key places that are always serial, regardless if parallel=true or not

Posted by GitBox <gi...@apache.org>.
zeroshade commented on PR #13120:
URL: https://github.com/apache/arrow/pull/13120#issuecomment-1126773620

   @raceordie690 For now we are not adopting the generics. 
   
   The standard recommendation is to support latest version - 1, so we want to maintain support for Go1.17 for now. When Go 1.19 is released, we can bump to Go1.18 and begin to use Generics.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org