You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by "Eugene Kirpichov (JIRA)" <ji...@apache.org> on 2018/03/05 22:35:00 UTC

[jira] [Created] (BEAM-3778) Very poor performance of side inputs when input is finely sharded

Eugene Kirpichov created BEAM-3778:
--------------------------------------

             Summary: Very poor performance of side inputs when input is finely sharded
                 Key: BEAM-3778
                 URL: https://issues.apache.org/jira/browse/BEAM-3778
             Project: Beam
          Issue Type: Bug
          Components: runner-dataflow
            Reporter: Eugene Kirpichov
            Assignee: Luke Cwik


This thread:
https://lists.apache.org/thread.html/324a4f86e567e3e1692466e70f44a08276123b467bacb2ecbf00515f@%3Cuser.beam.apache.org%3E

The user has a job that reads a few hundred thousand files and then writes them to BigQuery. This generates 1 temp file per input file. Then we gather the temp files into a View.asList() side input - and this side input ends up containing a few hundred thousand tiny ISM files, with 1 element per file, which performs horribly (taking hours to read the side input).

I think we need to reshuffle things onto a reasonable number of shards before writing them to ISM.

A side issue: this https://github.com/apache/beam/blob/v2.2.0/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/ReifyAsIterable.java#L46 triggers also the coder size estimation logic, which falsely thinks that size estimation in this case is cheap, and does double the work, as evidenced by the following stack trace:

Processing lull for PT30900.015S in state process of WriteTOBigQuery/BatchLoads/ReifyResults/ParDo(Anonymous)
 java.net.SocketInputStream.socketRead0(Native Method)
 java.net.SocketInputStream.socketRead(SocketInputStream.java:116)
 java.net.SocketInputStream.read(SocketInputStream.java:170)
 java.net.SocketInputStream.read(SocketInputStream.java:141)
 sun.security.ssl.InputRecord.readFully(InputRecord.java:465)
 sun.security.ssl.InputRecord.read(InputRecord.java:503)
 sun.security.ssl.SSLSocketImpl.readRecord(SSLSocketImpl.java:983)
 sun.security.ssl.SSLSocketImpl.readDataRecord(SSLSocketImpl.java:940)
 sun.security.ssl.AppInputStream.read(AppInputStream.java:105)
 java.io.BufferedInputStream.fill(BufferedInputStream.java:246)
 java.io.BufferedInputStream.read1(BufferedInputStream.java:286)
 java.io.BufferedInputStream.read(BufferedInputStream.java:345)
 sun.net.www.http.HttpClient.parseHTTPHeader(HttpClient.java:704)
 sun.net.www.http.HttpClient.parseHTTP(HttpClient.java:647)
 sun.net.www.protocol.http.HttpURLConnection.getInputStream0(HttpURLConnection.java:1536)
 sun.net.www.protocol.http.HttpURLConnection.getInputStream(HttpURLConnection.java:1441)
 java.net.HttpURLConnection.getResponseCode(HttpURLConnection.java:480)
 sun.net.www.protocol.https.HttpsURLConnectionImpl.getResponseCode(HttpsURLConnectionImpl.java:338)
 com.google.api.client.http.javanet.NetHttpResponse.<init>(NetHttpResponse.java:37)
 com.google.api.client.http.javanet.NetHttpRequest.execute(NetHttpRequest.java:94)
 com.google.api.client.http.HttpRequest.execute(HttpRequest.java:981)
 com.google.api.client.googleapis.services.AbstractGoogleClientRequest.executeUnparsed(AbstractGoogleClientRequest.java:419)
 com.google.api.client.googleapis.services.AbstractGoogleClientRequest.executeUnparsed(AbstractGoogleClientRequest.java:352)
 com.google.api.client.googleapis.services.AbstractGoogleClientRequest.executeMedia(AbstractGoogleClientRequest.java:380)
 com.google.api.services.storage.Storage$Objects$Get.executeMedia(Storage.java:4784)
 com.google.cloud.hadoop.gcsio.GoogleCloudStorageReadChannel.openStreamAndSetMetadata(GoogleCloudStorageReadChannel.java:656)
 com.google.cloud.hadoop.gcsio.GoogleCloudStorageReadChannel.performLazySeek(GoogleCloudStorageReadChannel.java:560)
 com.google.cloud.hadoop.gcsio.GoogleCloudStorageReadChannel.read(GoogleCloudStorageReadChannel.java:289)
 sun.nio.ch.ChannelInputStream.read(ChannelInputStream.java:65)
 sun.nio.ch.ChannelInputStream.read(ChannelInputStream.java:109)
 sun.nio.ch.ChannelInputStream.read(ChannelInputStream.java:103)
 java.io.InputStream.read(InputStream.java:101)
 sun.nio.ch.ChannelInputStream.read(ChannelInputStream.java:81)
 org.apache.beam.sdk.util.VarInt.decodeLong(VarInt.java:79)
 org.apache.beam.sdk.util.VarInt.decodeInt(VarInt.java:63)
 org.apache.beam.runners.dataflow.internal.IsmFormat$KeyPrefixCoder.decode(IsmFormat.java:694)
 com.google.cloud.dataflow.worker.IsmReader.readKey(IsmReader.java:999)
 com.google.cloud.dataflow.worker.IsmReader.access$2000(IsmReader.java:79)
 com.google.cloud.dataflow.worker.IsmReader$WithinShardIsmReaderIterator.advance(IsmReader.java:952)
 com.google.cloud.dataflow.worker.IsmReader$WithinShardIsmReaderIterator.start(IsmReader.java:942)
 com.google.cloud.dataflow.worker.IsmReader$IsmCacheLoader.call(IsmReader.java:580)
 com.google.cloud.dataflow.worker.IsmReader$IsmCacheLoader.call(IsmReader.java:569)
 com.google.cloud.dataflow.worker.IsmReader$IsmCacheLoader.call(IsmReader.java:554)
 com.google.cloud.dataflow.worker.repackaged.com.google.common.cache.LocalCache$LocalManualCache$1.load(LocalCache.java:4904)
 com.google.cloud.dataflow.worker.repackaged.com.google.common.cache.LocalCache$LoadingValueReference.loadFuture(LocalCache.java:3628)
 com.google.cloud.dataflow.worker.repackaged.com.google.common.cache.LocalCache$Segment.loadSync(LocalCache.java:2336)
 com.google.cloud.dataflow.worker.repackaged.com.google.common.cache.LocalCache$Segment.lockedGetOrLoad(LocalCache.java:2295)
 com.google.cloud.dataflow.worker.repackaged.com.google.common.cache.LocalCache$Segment.get(LocalCache.java:2208)
 com.google.cloud.dataflow.worker.repackaged.com.google.common.cache.LocalCache.get(LocalCache.java:4053)
 com.google.cloud.dataflow.worker.repackaged.com.google.common.cache.LocalCache$LocalManualCache.get(LocalCache.java:4899)
 com.google.cloud.dataflow.worker.IsmReader.fetch(IsmReader.java:605)
 com.google.cloud.dataflow.worker.IsmReader.getBlock(IsmReader.java:770)
 com.google.cloud.dataflow.worker.IsmReader.access$1000(IsmReader.java:79)
 com.google.cloud.dataflow.worker.IsmReader$IsmPrefixReaderIterator.get(IsmReader.java:641)
 com.google.cloud.dataflow.worker.IsmSideInputReader$ListOverReaderIterators.getUsingLong(IsmSideInputReader.java:674)
 com.google.cloud.dataflow.worker.IsmSideInputReader$ListOverReaderIterators.access$1300(IsmSideInputReader.java:620)
 com.google.cloud.dataflow.worker.IsmSideInputReader$ListOverReaderIterators$ListIteratorOverReaderIterators.next(IsmSideInputReader.java:715)
 java.util.Collections$UnmodifiableCollection$1.next(Collections.java:1042)
 org.apache.beam.sdk.coders.IterableLikeCoder.registerByteSizeObserver(IterableLikeCoder.java:195)
 org.apache.beam.sdk.coders.IterableLikeCoder.registerByteSizeObserver(IterableLikeCoder.java:60)
 org.apache.beam.sdk.util.WindowedValue$FullWindowedValueCoder.registerByteSizeObserver(WindowedValue.java:685)
 org.apache.beam.sdk.util.WindowedValue$FullWindowedValueCoder.registerByteSizeObserver(WindowedValue.java:599)
 com.google.cloud.dataflow.worker.MapTaskExecutorFactory$ElementByteSizeObservableCoder.registerByteSizeObserver(MapTaskExecutorFactory.java:520)
 com.google.cloud.dataflow.worker.util.common.worker.OutputObjectAndByteCounter.update(OutputObjectAndByteCounter.java:134)
 com.google.cloud.dataflow.worker.DataflowOutputCounter.update(DataflowOutputCounter.java:63)
 com.google.cloud.dataflow.worker.util.common.worker.OutputReceiver.process(OutputReceiver.java:46)
 com.google.cloud.dataflow.worker.SimpleParDoFn$1.output(SimpleParDoFn.java:272)



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)