You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@druid.apache.org by "gianm (via GitHub)" <gi...@apache.org> on 2023/03/28 09:26:50 UTC

[GitHub] [druid] gianm commented on a diff in pull request #13773: Add window-focused tests from Drill

gianm commented on code in PR #13773:
URL: https://github.com/apache/druid/pull/13773#discussion_r1150255111


##########
processing/src/main/java/org/apache/druid/jackson/DruidDefaultSerializersModule.java:
##########
@@ -182,5 +184,21 @@ public ByteOrder deserialize(JsonParser jp, DeserializationContext ctxt) throws
         }
     );
     addDeserializer(ResponseContext.class, new ResponseContextDeserializer());
+
+    addSerializer(RowsAndColumns.class, new JsonSerializer<RowsAndColumns>()
+    {
+      @Override
+      public void serialize(
+          RowsAndColumns value,
+          JsonGenerator gen,
+          SerializerProvider serializers
+      ) throws IOException
+      {
+        // It would be really cool if jackson offered an output stream that would allow us to push bytes

Review Comment:
   would `gen.writeBinary(InputStream, int)` do it? i.e., have `WireTransferable` return an`InputStream` instead of `byte[]`.



##########
processing/src/main/java/org/apache/druid/query/rowsandcols/semantic/WireTransferable.java:
##########
@@ -0,0 +1,37 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.query.rowsandcols.semantic;
+
+import org.apache.druid.java.util.common.ISE;
+import org.apache.druid.query.rowsandcols.RowsAndColumns;
+
+public interface WireTransferable
+{
+  static WireTransferable fromRAC(RowsAndColumns rac)
+  {
+    WireTransferable retVal = rac.as(WireTransferable.class);

Review Comment:
   A disadvantage of `as` is that it's not clear what all could be passed in, and what each thing enables. I think we could use a list somewhere. Maybe on Javadoc for `RowsAndColumns#as` itself?



##########
processing/src/main/java/org/apache/druid/query/rowsandcols/concrete/FrameRowsAndColumns.java:
##########
@@ -0,0 +1,85 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.query.rowsandcols.concrete;
+
+import org.apache.druid.frame.Frame;
+import org.apache.druid.frame.read.columnar.FrameColumnReaders;
+import org.apache.druid.java.util.common.ISE;
+import org.apache.druid.query.rowsandcols.RowsAndColumns;
+import org.apache.druid.query.rowsandcols.column.Column;
+import org.apache.druid.segment.column.ColumnType;
+import org.apache.druid.segment.column.RowSignature;
+
+import javax.annotation.Nullable;
+import java.util.Collection;
+import java.util.LinkedHashMap;
+
+public class FrameRowsAndColumns implements RowsAndColumns
+{
+  private final Frame frame;
+  private final RowSignature signature;
+  private final LinkedHashMap<String, Column> colCache = new LinkedHashMap<>();
+
+  public FrameRowsAndColumns(Frame frame, RowSignature signature)
+  {
+    this.frame = frame;
+    this.signature = signature;
+  }
+
+  @Override
+  public Collection<String> getColumnNames()
+  {
+    return signature.getColumnNames();
+  }
+
+  @Override
+  public int numRows()
+  {
+    return frame.numRows();
+  }
+
+  @Nullable
+  @Override
+  public Column findColumn(String name)
+  {
+    // Use contains so that we can negative cache.
+    if (!colCache.containsKey(name)) {
+      final int columnIndex = signature.indexOf(name);
+      if (columnIndex < 0) {
+        colCache.put(name, null);
+      } else {
+        final ColumnType columnType = signature
+            .getColumnType(columnIndex)
+            .orElseThrow(() -> new ISE("just got the id, why is columnType not there?"));
+
+        colCache.put(name, FrameColumnReaders.create(columnIndex, columnType).readRACColumn(frame));

Review Comment:
   I've been trying to re-use `FrameColumnReader` and `FieldReader` as much as possible (and centralize logic for reading from frames) by putting stuff like this into `FrameReader`.
   
   I suggest having this be a method in there like `readRACColumn(Frame frame, String columnName)`. Its implementation would be similar to `columnCapabilities(Frame frame, String columnName)`. In the future, if this code is extended to handle row-based frames too, then having it inside `FrameReader` would provide a nice abstraction.



##########
processing/src/main/java/org/apache/druid/query/rowsandcols/concrete/FrameRowsAndColumns.java:
##########
@@ -0,0 +1,85 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.query.rowsandcols.concrete;
+
+import org.apache.druid.frame.Frame;
+import org.apache.druid.frame.read.columnar.FrameColumnReaders;
+import org.apache.druid.java.util.common.ISE;
+import org.apache.druid.query.rowsandcols.RowsAndColumns;
+import org.apache.druid.query.rowsandcols.column.Column;
+import org.apache.druid.segment.column.ColumnType;
+import org.apache.druid.segment.column.RowSignature;
+
+import javax.annotation.Nullable;
+import java.util.Collection;
+import java.util.LinkedHashMap;
+
+public class FrameRowsAndColumns implements RowsAndColumns
+{
+  private final Frame frame;
+  private final RowSignature signature;
+  private final LinkedHashMap<String, Column> colCache = new LinkedHashMap<>();
+
+  public FrameRowsAndColumns(Frame frame, RowSignature signature)
+  {
+    this.frame = frame;
+    this.signature = signature;
+  }
+
+  @Override
+  public Collection<String> getColumnNames()
+  {
+    return signature.getColumnNames();
+  }
+
+  @Override
+  public int numRows()
+  {
+    return frame.numRows();
+  }
+
+  @Nullable
+  @Override
+  public Column findColumn(String name)
+  {
+    // Use contains so that we can negative cache.
+    if (!colCache.containsKey(name)) {
+      final int columnIndex = signature.indexOf(name);
+      if (columnIndex < 0) {
+        colCache.put(name, null);
+      } else {
+        final ColumnType columnType = signature
+            .getColumnType(columnIndex)
+            .orElseThrow(() -> new ISE("just got the id, why is columnType not there?"));
+
+        colCache.put(name, FrameColumnReaders.create(columnIndex, columnType).readRACColumn(frame));
+      }
+    }
+    return colCache.get(name);
+
+  }
+
+  @Nullable
+  @Override
+  public <T> T as(Class<T> clazz)
+  {
+    throw new UnsupportedOperationException();

Review Comment:
   Is `return null` a better implementation if we don't know how to become anything else?



##########
processing/src/main/java/org/apache/druid/frame/read/columnar/ComplexFrameColumnReader.java:
##########
@@ -47,6 +51,38 @@ public class ComplexFrameColumnReader implements FrameColumnReader
     this.columnNumber = columnNumber;
   }
 
+  @Override
+  public Column readRACColumn(Frame frame)

Review Comment:
   Lot of copypasta here, could it be factored out into a method that makes a `ComplexFrameColumn`, and then the RACColumn and ColumnPlus readers could call that?



##########
processing/src/main/java/org/apache/druid/frame/read/columnar/DoubleFrameColumnReader.java:
##########
@@ -48,6 +51,18 @@ public class DoubleFrameColumnReader implements FrameColumnReader
     this.columnNumber = columnNumber;
   }
 
+  @Override
+  public Column readRACColumn(Frame frame)

Review Comment:
   Similar copypasta comment for this & the other readers.



##########
processing/src/main/java/org/apache/druid/query/rowsandcols/DecoratedRowsAndColumns.java:
##########
@@ -0,0 +1,246 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.query.rowsandcols;
+
+import org.apache.druid.frame.Frame;
+import org.apache.druid.frame.FrameType;
+import org.apache.druid.frame.allocation.ArenaMemoryAllocatorFactory;
+import org.apache.druid.frame.key.KeyColumn;
+import org.apache.druid.frame.key.KeyOrder;
+import org.apache.druid.frame.write.FrameWriter;
+import org.apache.druid.frame.write.FrameWriterFactory;
+import org.apache.druid.frame.write.FrameWriters;
+import org.apache.druid.java.util.common.ISE;
+import org.apache.druid.java.util.common.Intervals;
+import org.apache.druid.java.util.common.granularity.Granularities;
+import org.apache.druid.java.util.common.guava.Sequence;
+import org.apache.druid.query.filter.Filter;
+import org.apache.druid.query.operator.ColumnWithDirection;
+import org.apache.druid.query.rowsandcols.column.Column;
+import org.apache.druid.query.rowsandcols.concrete.FrameRowsAndColumns;
+import org.apache.druid.query.rowsandcols.semantic.DecoratableRowsAndColumns;
+import org.apache.druid.segment.ColumnSelectorFactory;
+import org.apache.druid.segment.Cursor;
+import org.apache.druid.segment.StorageAdapter;
+import org.apache.druid.segment.VirtualColumns;
+import org.apache.druid.segment.column.ColumnCapabilities;
+import org.apache.druid.segment.column.RowSignature;
+import org.joda.time.Interval;
+
+import javax.annotation.Nullable;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.LinkedHashSet;
+import java.util.List;
+import java.util.concurrent.atomic.AtomicReference;
+
+public class DecoratedRowsAndColumns implements DecoratableRowsAndColumns
+{
+  private final RowsAndColumns base;
+
+  private RowsAndColumns materialized = null;
+
+  private Interval interval = null;
+  private Filter filter = null;
+  private VirtualColumns virtualColumns = null;
+  private int limit = -1;
+  private LinkedHashSet<String> viewableColumns = null;
+  private List<ColumnWithDirection> ordering = null;
+
+  public DecoratedRowsAndColumns(
+      RowsAndColumns base
+  )
+  {
+    this.base = base;
+  }
+
+  @Override
+  public Collection<String> getColumnNames()
+  {
+    return viewableColumns == null ? base.getColumnNames() : viewableColumns;
+  }
+
+  @Override
+  public int numRows()
+  {
+    return materializedOrBase().numRows();
+  }
+
+  @Nullable
+  @Override
+  public Column findColumn(String name)
+  {
+    if (viewableColumns != null && !viewableColumns.contains(name)) {
+      return null;
+    }
+
+    return materializedOrBase().findColumn(name);
+  }
+
+  @Nullable
+  @Override
+  public <T> T as(Class<T> clazz)
+  {
+    return null;
+  }
+
+  @Override
+  public void limitTimeRange(Interval interval)
+  {
+    this.interval = interval;

Review Comment:
   Throw an error if `interval` was already set? (Otherwise, if it's called twice by mistake, the prior value would be lost.) Unless it's intentional that the prior value be lost if this method is called twice? In which case it would be nice if the javadoc mentioned that.
   
   (Similar comment for the other methods.)



##########
processing/src/main/java/org/apache/druid/query/operator/NaivePartitioningOperator.java:
##########
@@ -55,33 +57,120 @@ public NaivePartitioningOperator(
   @Override
   public Closeable goOrContinue(Closeable continuation, Receiver receiver)
   {
-    return child.goOrContinue(
+    if (continuation != null) {
+      Continuation cont = (Continuation) continuation;
+
+      if (cont.iter != null) {
+        while (cont.iter.hasNext()) {
+          final Signal signal = receiver.push(cont.iter.next());
+          switch (signal) {
+            case GO:
+              break;
+            case PAUSE:
+              if (cont.iter.hasNext()) {
+                return cont;
+              }
+
+              if (cont.subContinuation == null) {
+                // We were finished anyway
+                receiver.completed();
+                return null;
+              }
+
+              return new Continuation(null, cont.subContinuation);
+            case STOP:
+              receiver.completed();
+              try {
+                cont.close();
+              }
+              catch (IOException e) {
+                throw new RE(e, "Unable to close continutation");
+              }
+              return null;
+            default:
+              throw new RE("Unknown signal[%s]", signal);
+          }
+        }
+
+        if (cont.subContinuation == null) {
+          receiver.completed();
+          return null;
+        }
+      }
+
+      continuation = cont.subContinuation;
+    }
+
+    AtomicReference<Iterator<RowsAndColumns>> iterHolder = new AtomicReference<>();
+
+    final Closeable retVal = child.goOrContinue(
         continuation,
         new Receiver()
         {
           @Override
           public Signal push(RowsAndColumns rac)
           {
+            if (rac == null) {
+              throw new ISE("huh?");

Review Comment:
   If I ever see this in prod I am gonna definitely say… "huh?"



##########
processing/src/main/java/org/apache/druid/frame/read/columnar/FrameColumnReader.java:
##########
@@ -28,6 +29,11 @@
  */
 public interface FrameColumnReader
 {
+  /**
+   * Returns a {@link Column} from the frame.
+   */
+  Column readRACColumn(Frame frame);

Review Comment:
   Is it OK that only the columnar frames are readable as RAC?
   
   The row-oriented frames are currently more common in prod, because they support fast sorting and merging, which is very important for most current usages of frames. (Keys are contiguous in memory, so we can compare keys using memory comparisons as bytes.)



##########
processing/src/main/java/org/apache/druid/query/rowsandcols/concrete/FrameRowsAndColumns.java:
##########
@@ -0,0 +1,85 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.query.rowsandcols.concrete;
+
+import org.apache.druid.frame.Frame;
+import org.apache.druid.frame.read.columnar.FrameColumnReaders;
+import org.apache.druid.java.util.common.ISE;
+import org.apache.druid.query.rowsandcols.RowsAndColumns;
+import org.apache.druid.query.rowsandcols.column.Column;
+import org.apache.druid.segment.column.ColumnType;
+import org.apache.druid.segment.column.RowSignature;
+
+import javax.annotation.Nullable;
+import java.util.Collection;
+import java.util.LinkedHashMap;
+
+public class FrameRowsAndColumns implements RowsAndColumns
+{
+  private final Frame frame;
+  private final RowSignature signature;
+  private final LinkedHashMap<String, Column> colCache = new LinkedHashMap<>();
+
+  public FrameRowsAndColumns(Frame frame, RowSignature signature)
+  {
+    this.frame = frame;

Review Comment:
   This code only works for columnar frames, so I'd suggest including a check here for fast-fail. This is the way it's done in other places:
   
   ```
   this.frame = FrameType.COLUMNAR.ensureType(frame);
   ```



##########
processing/src/main/java/org/apache/druid/query/rowsandcols/DecoratedRowsAndColumns.java:
##########
@@ -0,0 +1,246 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.query.rowsandcols;
+
+import org.apache.druid.frame.Frame;
+import org.apache.druid.frame.FrameType;
+import org.apache.druid.frame.allocation.ArenaMemoryAllocatorFactory;
+import org.apache.druid.frame.key.KeyColumn;
+import org.apache.druid.frame.key.KeyOrder;
+import org.apache.druid.frame.write.FrameWriter;
+import org.apache.druid.frame.write.FrameWriterFactory;
+import org.apache.druid.frame.write.FrameWriters;
+import org.apache.druid.java.util.common.ISE;
+import org.apache.druid.java.util.common.Intervals;
+import org.apache.druid.java.util.common.granularity.Granularities;
+import org.apache.druid.java.util.common.guava.Sequence;
+import org.apache.druid.query.filter.Filter;
+import org.apache.druid.query.operator.ColumnWithDirection;
+import org.apache.druid.query.rowsandcols.column.Column;
+import org.apache.druid.query.rowsandcols.concrete.FrameRowsAndColumns;
+import org.apache.druid.query.rowsandcols.semantic.DecoratableRowsAndColumns;
+import org.apache.druid.segment.ColumnSelectorFactory;
+import org.apache.druid.segment.Cursor;
+import org.apache.druid.segment.StorageAdapter;
+import org.apache.druid.segment.VirtualColumns;
+import org.apache.druid.segment.column.ColumnCapabilities;
+import org.apache.druid.segment.column.RowSignature;
+import org.joda.time.Interval;
+
+import javax.annotation.Nullable;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.LinkedHashSet;
+import java.util.List;
+import java.util.concurrent.atomic.AtomicReference;
+
+public class DecoratedRowsAndColumns implements DecoratableRowsAndColumns

Review Comment:
   Javadoc would be nice. Also, the concrete impl having a similar and only subtly different name to the interface is tricky on the eyes. Maybe `DecoratableRowsAndColumnsImpl` would be a better name.



##########
processing/src/main/java/org/apache/druid/query/rowsandcols/semantic/DecoratableRowsAndColumns.java:
##########
@@ -0,0 +1,57 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.query.rowsandcols.semantic;
+
+import org.apache.druid.query.filter.Filter;
+import org.apache.druid.query.operator.ColumnWithDirection;
+import org.apache.druid.query.rowsandcols.DecoratedRowsAndColumns;
+import org.apache.druid.query.rowsandcols.RowsAndColumns;
+import org.apache.druid.segment.VirtualColumns;
+import org.joda.time.Interval;
+
+import java.util.List;
+
+public interface DecoratableRowsAndColumns extends RowsAndColumns

Review Comment:
   Javadocs would be nice. Not immediately obvious what this is for.



##########
processing/src/main/java/org/apache/druid/query/operator/SegmentToRowsAndColumnsOperator.java:
##########
@@ -48,6 +48,9 @@ public Closeable goOrContinue(Closeable continuation, Receiver receiver)
       }
 
       RowsAndColumns rac = shifty.as(RowsAndColumns.class);
+      if (shifty instanceof RowsAndColumns) {
+        rac = (RowsAndColumns) shifty;

Review Comment:
   Shouldn't `shifty` have returned `this` from `as` in this case? Wonder why we need this block.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org
For additional commands, e-mail: commits-help@druid.apache.org