You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@druid.apache.org by GitBox <gi...@apache.org> on 2022/02/18 12:54:43 UTC

[GitHub] [druid] abhishekagarwal87 opened a new pull request #12269: Reuse the JsonReader in SettableByteEntityReader

abhishekagarwal87 opened a new pull request #12269:
URL: https://github.com/apache/druid/pull/12269


   Using `InputFormat` instead of `ParseSpec` can be very slow if complex flattening scheme is used. This is because with InputFormat, we create an `InputEntityReader` for every stream chunk. It was assumed that creating the reader is a cheap operation but it is not in some cases. 
   
   Here are two flame graphs to illustrate the difference
   Task runner thread with InputFormat
   ![Screenshot 2022-02-18 at 1 42 58 PM](https://user-images.githubusercontent.com/1477457/154686173-9f8551b0-54b0-4986-b601-ae54ed62684d.png)
   
   Task runner thread with ParseSpec
   ![Screenshot 2022-02-18 at 1 41 46 PM](https://user-images.githubusercontent.com/1477457/154686201-03b5c9eb-efaa-4f16-aefa-74530da6dcf7.png)
   
   Instead of fixing the JsonParser, I have added a new class called `SettableByteEntity` that allows one to add new binary data without creating a new InputStream. So we can create an InputEntityReader just once instead of creating it for every stream chunk. 
   
   This PR has:
   - [ ] been self-reviewed.
      - [ ] using the [concurrency checklist](https://github.com/apache/druid/blob/master/dev/code-review/concurrency.md) (Remove this item if the PR doesn't have any relation to concurrency.)
   - [ ] added documentation for new or modified features or behaviors.
   - [ ] added Javadocs for most classes and all non-trivial methods. Linked related entities via Javadoc links.
   - [ ] added or updated version, license, or notice information in [licenses.yaml](https://github.com/apache/druid/blob/master/dev/license.md)
   - [ ] added comments explaining the "why" and the intent of the code wherever would not be obvious for an unfamiliar reader.
   - [ ] added unit tests or modified existing tests to cover new code paths, ensuring the threshold for [code coverage](https://github.com/apache/druid/blob/master/dev/code-review/code-coverage.md) is met.
   - [ ] added integration tests.
   - [ ] been tested in a test Druid cluster.
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org
For additional commands, e-mail: commits-help@druid.apache.org


[GitHub] [druid] jihoonson commented on pull request #12269: Reuse the InputEntityReader in SettableByteEntityReader

Posted by GitBox <gi...@apache.org>.
jihoonson commented on pull request #12269:
URL: https://github.com/apache/druid/pull/12269#issuecomment-1044991290


   @abhishekagarwal87 thanks for making this PR. The overall idea SGTM. Can you check the Travis failure? Two indexing module test jobs timed out while running kinesis ingestion tests which seems suspicious. 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org
For additional commands, e-mail: commits-help@druid.apache.org


[GitHub] [druid] kfaraz commented on a change in pull request #12269: Reuse the InputEntityReader in SettableByteEntityReader

Posted by GitBox <gi...@apache.org>.
kfaraz commented on a change in pull request #12269:
URL: https://github.com/apache/druid/pull/12269#discussion_r822432410



##########
File path: indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/SettableByteEntity.java
##########
@@ -0,0 +1,124 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.indexing.seekablestream;
+
+import com.google.common.base.Preconditions;
+import org.apache.druid.data.input.InputEntity;
+import org.apache.druid.data.input.impl.ByteEntity;
+import org.apache.druid.io.ByteBufferInputStream;
+import org.apache.druid.java.util.common.IAE;
+
+import javax.annotation.Nullable;
+import javax.annotation.concurrent.NotThreadSafe;
+
+import java.io.InputStream;
+import java.net.URI;
+import java.nio.ByteBuffer;
+
+/**
+ * This class is only to be used with {@link SettableByteEntityReader} and {code KafkaInputFormat}. It is useful for stream
+ * processing where binary records are arriving as a list but {@link org.apache.druid.data.input.InputEntityReader}, that
+ * parses the data, expects an {@link InputStream}. This class mimics a continuous InputStream while behind the scenes,
+ * binary records are being put one after the other that the InputStream consumes bytes from. One record is fully
+ * consumed and only then the next record is set. This class doesn't allow reading the same data twice.
+ * This class solely exists to overcome the limitations imposed by interfaces for reading and parsing data.
+ *
+ */
+@NotThreadSafe
+public class SettableByteEntity implements InputEntity
+{
+  private final SettableByteBufferInputStream inputStream;
+  private boolean opened = false;
+  private ByteEntity entity;
+
+  public SettableByteEntity()
+  {
+    this.inputStream = new SettableByteBufferInputStream();
+  }
+
+  public void setEntity(ByteEntity entity)
+  {
+    inputStream.setBuffer(entity.getBuffer());
+    this.entity = entity;
+    opened = false;
+  }
+
+  @Nullable
+  @Override
+  public URI getUri()
+  {
+    return null;
+  }
+
+  public ByteEntity getEntity()
+  {
+    return entity;
+  }
+
+  /**
+   * This method can be called multiple times only for different data. So you can open a new input stream
+   * only after a new buffer is in use.
+   */
+  @Override
+  public InputStream open()
+  {
+    if (opened) {
+      throw new IllegalArgumentException("Can't open the input stream on SettableByteEntity more than once");
+    }
+
+    opened = true;
+    return inputStream;
+  }
+
+  public static final class SettableByteBufferInputStream extends InputStream
+  {
+    @Nullable
+    private ByteBufferInputStream delegate;
+
+    public void setBuffer(ByteBuffer newBuffer)
+    {
+      if (null != delegate && available() > 0) {
+        throw new IAE("New data cannot be set in buffer till all the old data has been read");
+      }
+      this.delegate = new ByteBufferInputStream(newBuffer);
+    }
+
+    @Override
+    public int read()
+    {
+      Preconditions.checkNotNull(delegate, "Buffet is not set");

Review comment:
       Nit:
   ```suggestion
         Preconditions.checkNotNull(delegate, "Buffer is not set");
   ```
   
   Hope, it wasn't on purpose! 😂 




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org
For additional commands, e-mail: commits-help@druid.apache.org


[GitHub] [druid] abhishekagarwal87 commented on a change in pull request #12269: Reuse the InputEntityReader in SettableByteEntityReader

Posted by GitBox <gi...@apache.org>.
abhishekagarwal87 commented on a change in pull request #12269:
URL: https://github.com/apache/druid/pull/12269#discussion_r822439243



##########
File path: indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/SettableByteEntity.java
##########
@@ -0,0 +1,124 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.indexing.seekablestream;
+
+import com.google.common.base.Preconditions;
+import org.apache.druid.data.input.InputEntity;
+import org.apache.druid.data.input.impl.ByteEntity;
+import org.apache.druid.io.ByteBufferInputStream;
+import org.apache.druid.java.util.common.IAE;
+
+import javax.annotation.Nullable;
+import javax.annotation.concurrent.NotThreadSafe;
+
+import java.io.InputStream;
+import java.net.URI;
+import java.nio.ByteBuffer;
+
+/**
+ * This class is only to be used with {@link SettableByteEntityReader} and {code KafkaInputFormat}. It is useful for stream
+ * processing where binary records are arriving as a list but {@link org.apache.druid.data.input.InputEntityReader}, that
+ * parses the data, expects an {@link InputStream}. This class mimics a continuous InputStream while behind the scenes,
+ * binary records are being put one after the other that the InputStream consumes bytes from. One record is fully
+ * consumed and only then the next record is set. This class doesn't allow reading the same data twice.
+ * This class solely exists to overcome the limitations imposed by interfaces for reading and parsing data.
+ *
+ */
+@NotThreadSafe
+public class SettableByteEntity implements InputEntity
+{
+  private final SettableByteBufferInputStream inputStream;
+  private boolean opened = false;
+  private ByteEntity entity;
+
+  public SettableByteEntity()
+  {
+    this.inputStream = new SettableByteBufferInputStream();
+  }
+
+  public void setEntity(ByteEntity entity)
+  {
+    inputStream.setBuffer(entity.getBuffer());
+    this.entity = entity;
+    opened = false;
+  }
+
+  @Nullable
+  @Override
+  public URI getUri()
+  {
+    return null;
+  }
+
+  public ByteEntity getEntity()
+  {
+    return entity;
+  }
+
+  /**
+   * This method can be called multiple times only for different data. So you can open a new input stream
+   * only after a new buffer is in use.
+   */
+  @Override
+  public InputStream open()
+  {
+    if (opened) {
+      throw new IllegalArgumentException("Can't open the input stream on SettableByteEntity more than once");
+    }
+
+    opened = true;
+    return inputStream;
+  }
+
+  public static final class SettableByteBufferInputStream extends InputStream
+  {
+    @Nullable
+    private ByteBufferInputStream delegate;
+
+    public void setBuffer(ByteBuffer newBuffer)
+    {
+      if (null != delegate && available() > 0) {
+        throw new IAE("New data cannot be set in buffer till all the old data has been read");
+      }
+      this.delegate = new ByteBufferInputStream(newBuffer);
+    }
+
+    @Override
+    public int read()
+    {
+      Preconditions.checkNotNull(delegate, "Buffet is not set");

Review comment:
       lol. I should fix this. 




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org
For additional commands, e-mail: commits-help@druid.apache.org


[GitHub] [druid] abhishekagarwal87 commented on pull request #12269: Reuse the InputEntityReader in SettableByteEntityReader

Posted by GitBox <gi...@apache.org>.
abhishekagarwal87 commented on pull request #12269:
URL: https://github.com/apache/druid/pull/12269#issuecomment-1046177188


   @jihoonson - Yes. The failures are definitely related. I will address them. 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org
For additional commands, e-mail: commits-help@druid.apache.org


[GitHub] [druid] abhishekagarwal87 commented on pull request #12269: Reuse the InputEntityReader in SettableByteEntityReader

Posted by GitBox <gi...@apache.org>.
abhishekagarwal87 commented on pull request #12269:
URL: https://github.com/apache/druid/pull/12269#issuecomment-1062723924


   Thank you @kfaraz. I have addressed your comments. 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org
For additional commands, e-mail: commits-help@druid.apache.org


[GitHub] [druid] clintropolis merged pull request #12269: Reuse the InputEntityReader in SettableByteEntityReader

Posted by GitBox <gi...@apache.org>.
clintropolis merged pull request #12269:
URL: https://github.com/apache/druid/pull/12269


   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org
For additional commands, e-mail: commits-help@druid.apache.org