You are viewing a plain text version of this content. The canonical link for it is here.
Posted to github@beam.apache.org by GitBox <gi...@apache.org> on 2020/12/19 14:44:12 UTC

[GitHub] [beam] piotr-szuberski commented on a change in pull request #13572: [BEAM-11482] Thrift support for KafkaTableProvider

piotr-szuberski commented on a change in pull request #13572:
URL: https://github.com/apache/beam/pull/13572#discussion_r546242274



##########
File path: CHANGES.md
##########
@@ -66,6 +66,7 @@
 * Added Cloud Bigtable Provider extension to Beam SQL ([BEAM-11173](https://issues.apache.org/jira/browse/BEAM-11173), [BEAM-11373](https://issues.apache.org/jira/browse/BEAM-11373))
 * Added a schema provider for thrift data ([BEAM-11338](https://issues.apache.org/jira/browse/BEAM-11338))
 * Added combiner packing pipeline optimization to Dataflow runner. ([BEAM-10641](https://issues.apache.org/jira/browse/BEAM-10641))
+* Added support for thrift in KafkaTableProvider ([BEAM-11482](https://issues.apache.org/jira/browse/BEAM-11482))

Review comment:
       It would be good to update the external table documentation (create-external-table.md) as well.

##########
File path: sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/meta/provider/kafka/BeamKafkaThriftTable.java
##########
@@ -0,0 +1,128 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.extensions.sql.meta.provider.kafka;
+
+import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
+import java.io.IOException;
+import java.util.List;
+import org.apache.beam.sdk.io.thrift.ThriftCoder;
+import org.apache.beam.sdk.io.thrift.ThriftSchema;
+import org.apache.beam.sdk.schemas.Schema;
+import org.apache.beam.sdk.schemas.SchemaProvider;
+import org.apache.beam.sdk.schemas.transforms.Convert;
+import org.apache.beam.sdk.transforms.MapElements;
+import org.apache.beam.sdk.transforms.PTransform;
+import org.apache.beam.sdk.transforms.Values;
+import org.apache.beam.sdk.values.KV;
+import org.apache.beam.sdk.values.PCollection;
+import org.apache.beam.sdk.values.Row;
+import org.apache.beam.sdk.values.TypeDescriptor;
+import org.apache.beam.sdk.values.TypeDescriptors;
+import org.apache.thrift.TBase;
+import org.apache.thrift.TFieldIdEnum;
+import org.apache.thrift.protocol.TProtocolFactory;
+import org.checkerframework.checker.nullness.qual.NonNull;
+
+public class BeamKafkaThriftTable<FieldT extends TFieldIdEnum, T extends TBase<T, FieldT>>
+    extends BeamKafkaTable {
+  private final ThriftCoder<T> thriftCoder;
+  private final TypeDescriptor<T> typeDescriptor;
+
+  public BeamKafkaThriftTable(
+      @NonNull Schema requiredSchema,
+      @NonNull String bootstrapServers,
+      @NonNull List<String> topics,
+      @NonNull Class<T> thriftClass,
+      @NonNull TProtocolFactory protocolFactory) {
+    super(thriftSchema(thriftClass, requiredSchema), bootstrapServers, topics);
+    typeDescriptor = TypeDescriptor.of(thriftClass);
+    thriftCoder = ThriftCoder.of(thriftClass, protocolFactory);
+  }
+
+  private static Schema thriftSchema(
+      @NonNull Class<?> thriftClass, @NonNull Schema requiredSchema) {
+    @SuppressWarnings("nullness")

Review comment:
       Can't it be written without suppressing the nullness? Sometimes checker is awkward and requires some workarounds but I think it should be possible.

##########
File path: sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/meta/provider/kafka/BeamKafkaThriftTable.java
##########
@@ -0,0 +1,128 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.extensions.sql.meta.provider.kafka;
+
+import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
+import java.io.IOException;
+import java.util.List;
+import org.apache.beam.sdk.io.thrift.ThriftCoder;
+import org.apache.beam.sdk.io.thrift.ThriftSchema;
+import org.apache.beam.sdk.schemas.Schema;
+import org.apache.beam.sdk.schemas.SchemaProvider;
+import org.apache.beam.sdk.schemas.transforms.Convert;
+import org.apache.beam.sdk.transforms.MapElements;
+import org.apache.beam.sdk.transforms.PTransform;
+import org.apache.beam.sdk.transforms.Values;
+import org.apache.beam.sdk.values.KV;
+import org.apache.beam.sdk.values.PCollection;
+import org.apache.beam.sdk.values.Row;
+import org.apache.beam.sdk.values.TypeDescriptor;
+import org.apache.beam.sdk.values.TypeDescriptors;
+import org.apache.thrift.TBase;
+import org.apache.thrift.TFieldIdEnum;
+import org.apache.thrift.protocol.TProtocolFactory;
+import org.checkerframework.checker.nullness.qual.NonNull;
+
+public class BeamKafkaThriftTable<FieldT extends TFieldIdEnum, T extends TBase<T, FieldT>>
+    extends BeamKafkaTable {
+  private final ThriftCoder<T> thriftCoder;
+  private final TypeDescriptor<T> typeDescriptor;
+
+  public BeamKafkaThriftTable(
+      @NonNull Schema requiredSchema,
+      @NonNull String bootstrapServers,
+      @NonNull List<String> topics,
+      @NonNull Class<T> thriftClass,
+      @NonNull TProtocolFactory protocolFactory) {
+    super(thriftSchema(thriftClass, requiredSchema), bootstrapServers, topics);
+    typeDescriptor = TypeDescriptor.of(thriftClass);
+    thriftCoder = ThriftCoder.of(thriftClass, protocolFactory);
+  }
+
+  private static Schema thriftSchema(
+      @NonNull Class<?> thriftClass, @NonNull Schema requiredSchema) {
+    @SuppressWarnings("nullness")
+    final @NonNull Schema thriftSchema =
+        ThriftSchema.provider().schemaFor(TypeDescriptor.of(thriftClass));
+    if (!requiredSchema.equivalent(thriftSchema)) {
+      throw new IllegalArgumentException(
+          String.format(
+              "Given message schema: '%s'%n"
+                  + "does not match schema inferred from thrift class.%n"
+                  + "Thrift class: '%s'%n"
+                  + "Inferred schema: '%s'",
+              requiredSchema, thriftClass.getName(), thriftSchema));
+    }
+    return thriftSchema;
+  }
+
+  @Override
+  protected PTransform<PCollection<KV<byte[], byte[]>>, PCollection<Row>> getPTransformForInput() {
+    final @NonNull SchemaProvider schemaProvider = ThriftSchema.provider();
+    return new PTransform<PCollection<KV<byte[], byte[]>>, PCollection<Row>>() {
+      @Override
+      @SuppressWarnings("nullness")
+      public PCollection<Row> expand(PCollection<KV<byte[], byte[]>> input) {
+        return input
+            .apply(Values.create())
+            .apply(MapElements.into(typeDescriptor).via(BeamKafkaThriftTable.this::decode))

Review comment:
       Wouldn't this::decode be sufficient?

##########
File path: sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/meta/provider/kafka/BeamKafkaThriftTable.java
##########
@@ -0,0 +1,128 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.extensions.sql.meta.provider.kafka;
+
+import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
+import java.io.IOException;
+import java.util.List;
+import org.apache.beam.sdk.io.thrift.ThriftCoder;
+import org.apache.beam.sdk.io.thrift.ThriftSchema;
+import org.apache.beam.sdk.schemas.Schema;
+import org.apache.beam.sdk.schemas.SchemaProvider;
+import org.apache.beam.sdk.schemas.transforms.Convert;
+import org.apache.beam.sdk.transforms.MapElements;
+import org.apache.beam.sdk.transforms.PTransform;
+import org.apache.beam.sdk.transforms.Values;
+import org.apache.beam.sdk.values.KV;
+import org.apache.beam.sdk.values.PCollection;
+import org.apache.beam.sdk.values.Row;
+import org.apache.beam.sdk.values.TypeDescriptor;
+import org.apache.beam.sdk.values.TypeDescriptors;
+import org.apache.thrift.TBase;
+import org.apache.thrift.TFieldIdEnum;
+import org.apache.thrift.protocol.TProtocolFactory;
+import org.checkerframework.checker.nullness.qual.NonNull;
+
+public class BeamKafkaThriftTable<FieldT extends TFieldIdEnum, T extends TBase<T, FieldT>>
+    extends BeamKafkaTable {
+  private final ThriftCoder<T> thriftCoder;
+  private final TypeDescriptor<T> typeDescriptor;
+
+  public BeamKafkaThriftTable(
+      @NonNull Schema requiredSchema,
+      @NonNull String bootstrapServers,
+      @NonNull List<String> topics,
+      @NonNull Class<T> thriftClass,
+      @NonNull TProtocolFactory protocolFactory) {
+    super(thriftSchema(thriftClass, requiredSchema), bootstrapServers, topics);
+    typeDescriptor = TypeDescriptor.of(thriftClass);
+    thriftCoder = ThriftCoder.of(thriftClass, protocolFactory);
+  }
+
+  private static Schema thriftSchema(
+      @NonNull Class<?> thriftClass, @NonNull Schema requiredSchema) {
+    @SuppressWarnings("nullness")
+    final @NonNull Schema thriftSchema =
+        ThriftSchema.provider().schemaFor(TypeDescriptor.of(thriftClass));
+    if (!requiredSchema.equivalent(thriftSchema)) {
+      throw new IllegalArgumentException(
+          String.format(
+              "Given message schema: '%s'%n"
+                  + "does not match schema inferred from thrift class.%n"
+                  + "Thrift class: '%s'%n"
+                  + "Inferred schema: '%s'",
+              requiredSchema, thriftClass.getName(), thriftSchema));
+    }
+    return thriftSchema;
+  }
+
+  @Override
+  protected PTransform<PCollection<KV<byte[], byte[]>>, PCollection<Row>> getPTransformForInput() {
+    final @NonNull SchemaProvider schemaProvider = ThriftSchema.provider();
+    return new PTransform<PCollection<KV<byte[], byte[]>>, PCollection<Row>>() {
+      @Override
+      @SuppressWarnings("nullness")
+      public PCollection<Row> expand(PCollection<KV<byte[], byte[]>> input) {
+        return input
+            .apply(Values.create())
+            .apply(MapElements.into(typeDescriptor).via(BeamKafkaThriftTable.this::decode))
+            .setSchema(
+                schema,
+                typeDescriptor,
+                schemaProvider.toRowFunction(typeDescriptor),
+                schemaProvider.fromRowFunction(typeDescriptor))
+            .apply(Convert.toRows());
+      }
+    };
+  }
+
+  private T decode(byte[] bytes) {
+    try {
+      return thriftCoder.decode(new ByteArrayInputStream(bytes));
+    } catch (IOException e) {
+      throw new IllegalStateException(e);

Review comment:
       Could you add some message to this exception?

##########
File path: sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/meta/provider/kafka/BeamKafkaThriftTable.java
##########
@@ -0,0 +1,128 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.extensions.sql.meta.provider.kafka;
+
+import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
+import java.io.IOException;
+import java.util.List;
+import org.apache.beam.sdk.io.thrift.ThriftCoder;
+import org.apache.beam.sdk.io.thrift.ThriftSchema;
+import org.apache.beam.sdk.schemas.Schema;
+import org.apache.beam.sdk.schemas.SchemaProvider;
+import org.apache.beam.sdk.schemas.transforms.Convert;
+import org.apache.beam.sdk.transforms.MapElements;
+import org.apache.beam.sdk.transforms.PTransform;
+import org.apache.beam.sdk.transforms.Values;
+import org.apache.beam.sdk.values.KV;
+import org.apache.beam.sdk.values.PCollection;
+import org.apache.beam.sdk.values.Row;
+import org.apache.beam.sdk.values.TypeDescriptor;
+import org.apache.beam.sdk.values.TypeDescriptors;
+import org.apache.thrift.TBase;
+import org.apache.thrift.TFieldIdEnum;
+import org.apache.thrift.protocol.TProtocolFactory;
+import org.checkerframework.checker.nullness.qual.NonNull;
+
+public class BeamKafkaThriftTable<FieldT extends TFieldIdEnum, T extends TBase<T, FieldT>>
+    extends BeamKafkaTable {
+  private final ThriftCoder<T> thriftCoder;
+  private final TypeDescriptor<T> typeDescriptor;
+
+  public BeamKafkaThriftTable(
+      @NonNull Schema requiredSchema,
+      @NonNull String bootstrapServers,
+      @NonNull List<String> topics,
+      @NonNull Class<T> thriftClass,
+      @NonNull TProtocolFactory protocolFactory) {
+    super(thriftSchema(thriftClass, requiredSchema), bootstrapServers, topics);
+    typeDescriptor = TypeDescriptor.of(thriftClass);
+    thriftCoder = ThriftCoder.of(thriftClass, protocolFactory);
+  }
+
+  private static Schema thriftSchema(
+      @NonNull Class<?> thriftClass, @NonNull Schema requiredSchema) {
+    @SuppressWarnings("nullness")
+    final @NonNull Schema thriftSchema =
+        ThriftSchema.provider().schemaFor(TypeDescriptor.of(thriftClass));
+    if (!requiredSchema.equivalent(thriftSchema)) {
+      throw new IllegalArgumentException(
+          String.format(
+              "Given message schema: '%s'%n"
+                  + "does not match schema inferred from thrift class.%n"
+                  + "Thrift class: '%s'%n"
+                  + "Inferred schema: '%s'",
+              requiredSchema, thriftClass.getName(), thriftSchema));
+    }
+    return thriftSchema;
+  }
+
+  @Override
+  protected PTransform<PCollection<KV<byte[], byte[]>>, PCollection<Row>> getPTransformForInput() {
+    final @NonNull SchemaProvider schemaProvider = ThriftSchema.provider();
+    return new PTransform<PCollection<KV<byte[], byte[]>>, PCollection<Row>>() {
+      @Override
+      @SuppressWarnings("nullness")

Review comment:
       I'd try to get rid of this nullness suppression. Usually if (var != null) {} else { throw NullPointerException("var was null") is unfortunatelly needed to achieve that (that's my experience, maybe there is some cleaner way that I don't know about)

##########
File path: sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/meta/provider/kafka/BeamKafkaThriftTable.java
##########
@@ -0,0 +1,128 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.extensions.sql.meta.provider.kafka;
+
+import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
+import java.io.IOException;
+import java.util.List;
+import org.apache.beam.sdk.io.thrift.ThriftCoder;
+import org.apache.beam.sdk.io.thrift.ThriftSchema;
+import org.apache.beam.sdk.schemas.Schema;
+import org.apache.beam.sdk.schemas.SchemaProvider;
+import org.apache.beam.sdk.schemas.transforms.Convert;
+import org.apache.beam.sdk.transforms.MapElements;
+import org.apache.beam.sdk.transforms.PTransform;
+import org.apache.beam.sdk.transforms.Values;
+import org.apache.beam.sdk.values.KV;
+import org.apache.beam.sdk.values.PCollection;
+import org.apache.beam.sdk.values.Row;
+import org.apache.beam.sdk.values.TypeDescriptor;
+import org.apache.beam.sdk.values.TypeDescriptors;
+import org.apache.thrift.TBase;
+import org.apache.thrift.TFieldIdEnum;
+import org.apache.thrift.protocol.TProtocolFactory;
+import org.checkerframework.checker.nullness.qual.NonNull;
+
+public class BeamKafkaThriftTable<FieldT extends TFieldIdEnum, T extends TBase<T, FieldT>>
+    extends BeamKafkaTable {
+  private final ThriftCoder<T> thriftCoder;
+  private final TypeDescriptor<T> typeDescriptor;
+
+  public BeamKafkaThriftTable(
+      @NonNull Schema requiredSchema,

Review comment:
       I may be wrong but I think that`@NonNull` annotations are quite redundant as it's the default for the checker and only `@Nullable` params are worth annotating. The same below.

##########
File path: sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/meta/provider/kafka/BeamKafkaThriftTable.java
##########
@@ -0,0 +1,128 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.extensions.sql.meta.provider.kafka;
+
+import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
+import java.io.IOException;
+import java.util.List;
+import org.apache.beam.sdk.io.thrift.ThriftCoder;
+import org.apache.beam.sdk.io.thrift.ThriftSchema;
+import org.apache.beam.sdk.schemas.Schema;
+import org.apache.beam.sdk.schemas.SchemaProvider;
+import org.apache.beam.sdk.schemas.transforms.Convert;
+import org.apache.beam.sdk.transforms.MapElements;
+import org.apache.beam.sdk.transforms.PTransform;
+import org.apache.beam.sdk.transforms.Values;
+import org.apache.beam.sdk.values.KV;
+import org.apache.beam.sdk.values.PCollection;
+import org.apache.beam.sdk.values.Row;
+import org.apache.beam.sdk.values.TypeDescriptor;
+import org.apache.beam.sdk.values.TypeDescriptors;
+import org.apache.thrift.TBase;
+import org.apache.thrift.TFieldIdEnum;
+import org.apache.thrift.protocol.TProtocolFactory;
+import org.checkerframework.checker.nullness.qual.NonNull;
+
+public class BeamKafkaThriftTable<FieldT extends TFieldIdEnum, T extends TBase<T, FieldT>>
+    extends BeamKafkaTable {
+  private final ThriftCoder<T> thriftCoder;
+  private final TypeDescriptor<T> typeDescriptor;
+
+  public BeamKafkaThriftTable(
+      @NonNull Schema requiredSchema,
+      @NonNull String bootstrapServers,
+      @NonNull List<String> topics,
+      @NonNull Class<T> thriftClass,
+      @NonNull TProtocolFactory protocolFactory) {
+    super(thriftSchema(thriftClass, requiredSchema), bootstrapServers, topics);
+    typeDescriptor = TypeDescriptor.of(thriftClass);
+    thriftCoder = ThriftCoder.of(thriftClass, protocolFactory);
+  }
+
+  private static Schema thriftSchema(
+      @NonNull Class<?> thriftClass, @NonNull Schema requiredSchema) {
+    @SuppressWarnings("nullness")
+    final @NonNull Schema thriftSchema =
+        ThriftSchema.provider().schemaFor(TypeDescriptor.of(thriftClass));
+    if (!requiredSchema.equivalent(thriftSchema)) {
+      throw new IllegalArgumentException(
+          String.format(
+              "Given message schema: '%s'%n"
+                  + "does not match schema inferred from thrift class.%n"
+                  + "Thrift class: '%s'%n"
+                  + "Inferred schema: '%s'",
+              requiredSchema, thriftClass.getName(), thriftSchema));
+    }
+    return thriftSchema;
+  }
+
+  @Override
+  protected PTransform<PCollection<KV<byte[], byte[]>>, PCollection<Row>> getPTransformForInput() {
+    final @NonNull SchemaProvider schemaProvider = ThriftSchema.provider();
+    return new PTransform<PCollection<KV<byte[], byte[]>>, PCollection<Row>>() {
+      @Override
+      @SuppressWarnings("nullness")
+      public PCollection<Row> expand(PCollection<KV<byte[], byte[]>> input) {
+        return input
+            .apply(Values.create())
+            .apply(MapElements.into(typeDescriptor).via(BeamKafkaThriftTable.this::decode))
+            .setSchema(
+                schema,
+                typeDescriptor,
+                schemaProvider.toRowFunction(typeDescriptor),
+                schemaProvider.fromRowFunction(typeDescriptor))
+            .apply(Convert.toRows());
+      }
+    };
+  }
+
+  private T decode(byte[] bytes) {
+    try {
+      return thriftCoder.decode(new ByteArrayInputStream(bytes));
+    } catch (IOException e) {
+      throw new IllegalStateException(e);
+    }
+  }
+
+  @Override
+  protected PTransform<PCollection<Row>, PCollection<KV<byte[], byte[]>>> getPTransformForOutput() {
+    final byte[] emptyKey = {};
+    final TypeDescriptor<byte[]> binTypeDescriptor = TypeDescriptor.of(byte[].class);
+    return new PTransform<PCollection<Row>, PCollection<KV<byte[], byte[]>>>() {
+      @Override
+      public PCollection<KV<byte[], byte[]>> expand(PCollection<Row> input) {
+        return input
+            .apply(Convert.fromRows(typeDescriptor))
+            .apply(
+                MapElements.into(TypeDescriptors.kvs(binTypeDescriptor, binTypeDescriptor))
+                    .via(thrift -> KV.of(emptyKey, encode(thrift))));
+      }
+    };
+  }
+
+  private byte[] encode(T thrift) {
+    final ByteArrayOutputStream out = new ByteArrayOutputStream();
+    try {
+      thriftCoder.encode(thrift, out);
+      return out.toByteArray();
+    } catch (IOException e) {
+      throw new IllegalStateException(e);

Review comment:
       Please add some message to the exception

##########
File path: sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/meta/provider/kafka/BeamKafkaThriftTable.java
##########
@@ -0,0 +1,128 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.extensions.sql.meta.provider.kafka;
+
+import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
+import java.io.IOException;
+import java.util.List;
+import org.apache.beam.sdk.io.thrift.ThriftCoder;
+import org.apache.beam.sdk.io.thrift.ThriftSchema;
+import org.apache.beam.sdk.schemas.Schema;
+import org.apache.beam.sdk.schemas.SchemaProvider;
+import org.apache.beam.sdk.schemas.transforms.Convert;
+import org.apache.beam.sdk.transforms.MapElements;
+import org.apache.beam.sdk.transforms.PTransform;
+import org.apache.beam.sdk.transforms.Values;
+import org.apache.beam.sdk.values.KV;
+import org.apache.beam.sdk.values.PCollection;
+import org.apache.beam.sdk.values.Row;
+import org.apache.beam.sdk.values.TypeDescriptor;
+import org.apache.beam.sdk.values.TypeDescriptors;
+import org.apache.thrift.TBase;
+import org.apache.thrift.TFieldIdEnum;
+import org.apache.thrift.protocol.TProtocolFactory;
+import org.checkerframework.checker.nullness.qual.NonNull;
+
+public class BeamKafkaThriftTable<FieldT extends TFieldIdEnum, T extends TBase<T, FieldT>>
+    extends BeamKafkaTable {
+  private final ThriftCoder<T> thriftCoder;
+  private final TypeDescriptor<T> typeDescriptor;
+
+  public BeamKafkaThriftTable(
+      @NonNull Schema requiredSchema,
+      @NonNull String bootstrapServers,
+      @NonNull List<String> topics,
+      @NonNull Class<T> thriftClass,
+      @NonNull TProtocolFactory protocolFactory) {
+    super(thriftSchema(thriftClass, requiredSchema), bootstrapServers, topics);
+    typeDescriptor = TypeDescriptor.of(thriftClass);
+    thriftCoder = ThriftCoder.of(thriftClass, protocolFactory);
+  }
+
+  private static Schema thriftSchema(
+      @NonNull Class<?> thriftClass, @NonNull Schema requiredSchema) {
+    @SuppressWarnings("nullness")
+    final @NonNull Schema thriftSchema =

Review comment:
       As the rest of `@NonNull` when a param is not `@Nullable` we assume it can't be null

##########
File path: sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/meta/provider/kafka/KafkaTableProvider.java
##########
@@ -85,6 +89,33 @@ public BeamSqlTable buildBeamSqlTable(Table table) {
         } catch (ClassNotFoundException e) {
           throw new IllegalArgumentException("Incorrect proto class provided: " + protoClassName);
         }
+      case THRIFT:
+        final String thriftClassName = properties.getString("thriftClass");

Review comment:
       I'd delegate this part of code to a function. It'll get a bit messy after we add another case to the switch statement.

##########
File path: sdks/java/io/thrift/src/main/java/org/apache/beam/sdk/io/thrift/ThriftSchema.java
##########
@@ -183,7 +184,12 @@ private Schema schemaFor(Class<?> targetClass) {
   private Schema.Field beamField(FieldMetaData fieldDescriptor) {
     try {
       final FieldType type = beamType(fieldDescriptor.valueMetaData);
-      return Schema.Field.nullable(fieldDescriptor.fieldName, type);
+      switch (fieldDescriptor.requirementType) {

Review comment:
       `if {} else {}` or  `return fieldDescriptor.requirementType == REQUIRED ? ... : ...` would be neater as there is only one case.

##########
File path: sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/meta/provider/kafka/KafkaTableProvider.java
##########
@@ -85,6 +89,33 @@ public BeamSqlTable buildBeamSqlTable(Table table) {
         } catch (ClassNotFoundException e) {
           throw new IllegalArgumentException("Incorrect proto class provided: " + protoClassName);
         }
+      case THRIFT:
+        final String thriftClassName = properties.getString("thriftClass");
+        final String thriftProtocolFactoryClassName =
+            properties.getString("thriftProtocolFactoryClass");
+        try {
+          final Class<TBase> thriftClass = (Class<TBase>) Class.forName(thriftClassName);
+          final TProtocolFactory thriftProtocolFactory;
+          try {
+            final Class<TProtocolFactory> thriftProtocolFactoryClass =
+                (Class<TProtocolFactory>) Class.forName(thriftProtocolFactoryClassName);
+            thriftProtocolFactory =
+                thriftProtocolFactoryClass.getDeclaredConstructor().newInstance();
+          } catch (ClassNotFoundException e) {
+            throw new IllegalArgumentException(
+                "Incorrect thrift protocol factory class provided: "
+                    + thriftProtocolFactoryClassName);
+          } catch (InstantiationException
+              | IllegalAccessException
+              | InvocationTargetException
+              | NoSuchMethodException e) {
+            throw new IllegalStateException(e);

Review comment:
       Please add a message here as well.

##########
File path: sdks/java/extensions/sql/src/test/thrift/kafka/message.thrift
##########
@@ -0,0 +1,31 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+/*
+thrift --gen java:private-members \

Review comment:
       +1 for this comment - it'll be helpful :)




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org