You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by "Aljoscha Krettek (JIRA)" <ji...@apache.org> on 2018/02/09 11:17:00 UTC
[jira] [Reopened] (FLINK-8619) Some thing about Flink SQL distinct,
I need help
[ https://issues.apache.org/jira/browse/FLINK-8619?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]
Aljoscha Krettek reopened FLINK-8619:
-------------------------------------
Reopen to remove fix version
> Some thing about Flink SQL distinct, I need help
> ------------------------------------------------
>
> Key: FLINK-8619
> URL: https://issues.apache.org/jira/browse/FLINK-8619
> Project: Flink
> Issue Type: Wish
> Components: Table API & SQL
> Affects Versions: 1.4.0
> Reporter: Lynch Lee
> Priority: Major
> Fix For: 1.4.0
>
>
> I do some test about distinct on mysql below:
>
>
> mysql> CREATE TABLE `rpt_tt` (
> -> `target_id` varchar(50) NOT NULL DEFAULT '',
> -> `target_type` varchar(50) NOT NULL DEFAULT '',
> -> `amt_pay` bigint(20) DEFAULT NULL,
> -> `down_payment` bigint(20) DEFAULT NULL,
> -> PRIMARY KEY (`target_id`,`target_type`,`amt_pay`)
> -> ) ENGINE=InnoDB DEFAULT CHARSET=utf8;
> Query OK, 0 rows affected (0.01 sec)
>
> mysql> insert into rpt_tt values("1","5","1","1");
> Query OK, 1 row affected (0.00 sec)
>
> mysql> insert into rpt_tt values("3","5","1","1");
> Query OK, 1 row affected (0.00 sec)
>
> mysql> insert into rpt_tt values("2","6","1","1");
> Query OK, 1 row affected (0.00 sec)
>
> mysql> insert into rpt_tt values("3","7","1","1");
> Query OK, 1 row affected (0.00 sec)
>
> mysql> select distinct(target_type),target_id,amt_pay,down_payment from rpt_tt;
> +--------------+----------++-----------------------+
> |target_type|target_id|amt_pay|down_payment|
> +--------------+----------++-----------------------+
> |5 |1 | 1| 1|
> |6 |2 | 1| 1|
> |5 |3 | 1| 1|
> |7 |3 | 1| 1|
> +--------------+----------++-----------------------+
> 4 rows in set (0.00 sec)
>
> mysql> select distinct(target_type),target_id,amt_pay,down_payment from rpt_tt group by target_type;
> +--------------+----------++-----------------------+
> |target_type|target_id|amt_pay|down_payment|
> +--------------+----------++-----------------------+
> |5 |1 | 1| 1|
> |6 |2 | 1| 1|
> |7 |3 | 1| 1|
> +--------------+----------++-----------------------+
> 3 rows in set (0.00 sec)
>
> mysql> select distinct(target_type),target_id,amt_pay,down_payment from rpt_tt group by target_type,target_id,amt_pay,down_payment;
> +--------------+----------++-----------------------+
> |target_type|target_id|amt_pay|down_payment|
> +--------------+----------++-----------------------+
> |5 |1 | 1| 1|
> |5 |3 | 1| 1|
> |6 |2 | 1| 1|
> |7 |3 | 1| 1|
> +--------------+----------++-----------------------+
> 4 rows in set (0.01 sec)
>
> But now,
> I want do some query on flink SQL, code is here:
> import com.fasterxml.jackson.databind.DeserializationFeature;
> import com.fasterxml.jackson.databind.JsonNode;
> import com.fasterxml.jackson.databind.ObjectMapper;
> import com.fasterxml.jackson.databind.node.JsonNodeFactory;
> import com.fasterxml.jackson.databind.node.ObjectNode;
> import com.god.hala.flink.convertors.RowIntoJson;
> import com.god.hala.flink.sources.DataSources;
> import org.apache.flink.api.common.functions.MapFunction;
> import org.apache.flink.api.common.restartstrategy.RestartStrategies;
> import org.apache.flink.api.common.serialization.AbstractDeserializationSchema;
> import org.apache.flink.api.common.time.Time;
> import org.apache.flink.api.common.typeinfo.TypeInformation;
> import org.apache.flink.api.java.tuple.Tuple2;
> import org.apache.flink.api.java.typeutils.RowTypeInfo;
> import org.apache.flink.streaming.api.CheckpointingMode;
> import org.apache.flink.streaming.api.TimeCharacteristic;
> import org.apache.flink.streaming.api.datastream.DataStream;
> import org.apache.flink.streaming.api.environment.LocalStreamEnvironment;
> import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
> import org.apache.flink.streaming.api.functions.ProcessFunction;
> import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer010;
> import org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer010;
> import org.apache.flink.streaming.util.serialization.JsonRowSerializationSchema;
> import org.apache.flink.table.api.StreamQueryConfig;
> import org.apache.flink.table.api.Table;
> import org.apache.flink.table.api.Types;
> import org.apache.flink.table.api.java.StreamTableEnvironment;
> import org.apache.flink.types.Row;
> import org.apache.flink.util.Collector;
> import org.slf4j.Logger;
> import org.slf4j.LoggerFactory;
> import java.nio.charset.Charset;
> import java.util.Properties;
> import java.util.UUID;
> public class KafkaConn2Topics1 {
> public static void main(String[] args) throws Exception
> { String inputTopic = "input-case01-test02"; String outputTopic = "output-case01-test02"; Properties props = new Properties(); props.setProperty("bootstrap.servers", "data-node5:9092"); props.setProperty("group.id", UUID.randomUUID().toString().replaceAll("-", "")); LocalStreamEnvironment streamEnv = StreamExecutionEnvironment.createLocalEnvironment(); streamEnv.setParallelism(1); streamEnv.setStreamTimeCharacteristic(TimeCharacteristic.EventTime); streamEnv.enableCheckpointing(5000, CheckpointingMode.EXACTLY_ONCE); StreamTableEnvironment tableEnv = StreamTableEnvironment.getTableEnvironment(streamEnv); StreamQueryConfig qConfig = tableEnv.queryConfig(); qConfig.withIdleStateRetentionTime(Time.seconds(0)); streamEnv.getConfig().enableSysoutLogging(); streamEnv.getConfig().setRestartStrategy(RestartStrategies.fixedDelayRestart(4, 10000)); RowTypeInfo rowSchema = new RowTypeInfo( new TypeInformation[]\\{Types.STRING(), Types.STRING(), Types.STRING(), Types.STRING()}
> ,
> new String[]\{"target_type", "target_id", "amt_pay", "down_payment"}
> );
> DataStream<Row> _inDataStream = streamEnv.addSource(kafkaSource(inputTopic, props))
> .map(new JsonIntoRow(rowSchema))
> .returns(rowSchema);
> final String _table = "table_" + UUID.randomUUID().toString().replaceAll("-", "");
> tableEnv.registerDataStream(_table, _inDataStream);
> final String _in_fields = " target_id, amt_pay, down_payment";
> String sql = "select distinct(target_type)," + _in_fields + " from " + _table + " group by target_type";
> System.out.println(sql);
> Table resultTable = tableEnv.sql(sql);
> DataStream<Row> _outStream =
> tableEnv.toRetractStream(resultTable, Row.class, qConfig)
> .process(new ProcessFunction<Tuple2<Boolean, Row>, Row>() {
> @Override
> public void processElement(Tuple2<Boolean, Row> value, Context ctx, Collector<Row> out) throws Exception {
> ObjectNode node = new RowIntoJson(rowSchema).run(value.f1);
> System.out.println("out1 row: " + node.toString());
> if (value.f0)
> { out.collect(value.f1); ObjectNode node1 = new RowIntoJson(rowSchema).run(value.f1); System.out.println("out11 row: " + node1.toString()); }
> }
> })
> .map(new MapFunction<Row, Row>() {
> @Override
> public Row map(Row value) throws Exception
> { ObjectNode node = new RowIntoJson(rowSchema).run(value); System.out.println("out2 row: " + node.toString()); return value; }
> }).name("result-pickout1-source2")
> .returns(rowSchema);
> _outStream.addSink(kafkaProducerJsonRow(outputTopic, props, rowSchema));
> streamEnv.execute(UUID.randomUUID().toString());
> }
> private static FlinkKafkaProducer010<Row> kafkaProducerJsonRow(String outputTopic, Properties props, RowTypeInfo rowSchema)
> { return new FlinkKafkaProducer010<>(outputTopic, new JsonRowSerializationSchema(rowSchema), props); }
> private static FlinkKafkaConsumer010<ObjectNode> kafkaSource(String inputTopic, Properties props)
> { return new FlinkKafkaConsumer010<>(inputTopic, new MyJson2ObjectNodeDeser(), props); }
> public static class MyJson2ObjectNodeDeser extends AbstractDeserializationSchema<ObjectNode> {
> private static final Logger LOGGER = LoggerFactory.getLogger(DataSources.MyJson2ObjectNodeDeser.class);
> private static ObjectMapper mapper = new ObjectMapper();
> static
> { mapper.configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES, false); mapper.configure(DeserializationFeature.FAIL_ON_NULL_FOR_PRIMITIVES, false); mapper.configure(DeserializationFeature.ACCEPT_EMPTY_STRING_AS_NULL_OBJECT, true); }
> @Override
> public ObjectNode deserialize(byte[] message) {
> if (mapper == null)
> { mapper = new ObjectMapper(); }
> try {
> ObjectNode jsonNode = mapper.readValue(message, ObjectNode.class);
> LOGGER.info("source data:{}", jsonNode);
> if (!jsonNode.has("record")) {
> LOGGER.warn("not required section[record] found, pass, received:{}", jsonNode);
> return JsonNodeFactory.instance.objectNode();
> }
> JsonNode record = jsonNode.get("record");
> if (!record.isObject()) {
> LOGGER.warn("value of section[record] should be Object. pls check your input:{}", jsonNode);
> return JsonNodeFactory.instance.objectNode();
> }
> LOGGER.info("record data:{}", record);
> System.out.println("record data: " + record.toString());
> return (ObjectNode) record;
> } catch (Exception e) {
> LOGGER.warn("ETL clean up fail for source stream data, pls check your data schema. fail over. data received: {}", new String(message, Charset.forName("UTF-8")), e);
> }
> return null;
> }
> @Override
> public boolean isEndOfStream(ObjectNode nextElement)
> { return false; }
> }
> }
>
> I got an error while I fire the job
> Exception in thread "main" org.apache.flink.table.api.ValidationException: SQL validation failed. From line 1, column 31 to line 1, column 39: Expression 'target_id' is not being grouped
> at org.apache.flink.table.calcite.FlinkPlannerImpl.validate(FlinkPlannerImpl.scala:93)
> at org.apache.flink.table.api.TableEnvironment.sqlQuery(TableEnvironment.scala:561)
> at org.apache.flink.table.api.TableEnvironment.sql(TableEnvironment.scala:535)
> at com.god.hala.flink.jobs.KafkaConn2Topics.main(KafkaConn2Topics.java:86)
> Caused by: org.apache.calcite.runtime.CalciteContextException: From line 1, column 31 to line 1, column 39: Expression 'target_id' is not being grouped
> at sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method)
> at sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:62)
> at sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45)
> at java.lang.reflect.Constructor.newInstance(Constructor.java:423)
> at org.apache.calcite.runtime.Resources$ExInstWithCause.ex(Resources.java:463)
> at org.apache.calcite.sql.SqlUtil.newContextException(SqlUtil.java:803)
> at org.apache.calcite.sql.SqlUtil.newContextException(SqlUtil.java:788)
> at org.apache.calcite.sql.validate.SqlValidatorImpl.newValidationError(SqlValidatorImpl.java:4654)
> at org.apache.calcite.sql.validate.AggChecker.visit(AggChecker.java:117)
> at org.apache.calcite.sql.validate.AggChecker.visit(AggChecker.java:41)
> at org.apache.calcite.sql.SqlIdentifier.accept(SqlIdentifier.java:344)
> at org.apache.calcite.sql.validate.AggregatingSelectScope.checkAggregateExpr(AggregatingSelectScope.java:231)
> at org.apache.calcite.sql.validate.AggregatingSelectScope.validateExpr(AggregatingSelectScope.java:240)
> at org.apache.calcite.sql.validate.SqlValidatorImpl.validateExpr(SqlValidatorImpl.java:4016)
> at org.apache.calcite.sql.validate.SqlValidatorImpl.validateSelectList(SqlValidatorImpl.java:3989)
> at org.apache.calcite.sql.validate.SqlValidatorImpl.validateSelect(SqlValidatorImpl.java:3218)
> at org.apache.calcite.sql.validate.SelectNamespace.validateImpl(SelectNamespace.java:60)
> at org.apache.calcite.sql.validate.AbstractNamespace.validate(AbstractNamespace.java:84)
> at org.apache.calcite.sql.validate.SqlValidatorImpl.validateNamespace(SqlValidatorImpl.java:945)
> at org.apache.calcite.sql.validate.SqlValidatorImpl.validateQuery(SqlValidatorImpl.java:926)
> at org.apache.calcite.sql.SqlSelect.validate(SqlSelect.java:226)
> at org.apache.calcite.sql.validate.SqlValidatorImpl.validateScopedExpression(SqlValidatorImpl.java:901)
> at org.apache.calcite.sql.validate.SqlValidatorImpl.validate(SqlValidatorImpl.java:611)
> at org.apache.flink.table.calcite.FlinkPlannerImpl.validate(FlinkPlannerImpl.scala:89)
> ... 3 more
> Caused by: org.apache.calcite.sql.validate.SqlValidatorException: Expression 'target_id' is not being grouped
> at sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method)
> at sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:62)
> at sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45)
> at java.lang.reflect.Constructor.newInstance(Constructor.java:423)
> at org.apache.calcite.runtime.Resources$ExInstWithCause.ex(Resources.java:463)
> at org.apache.calcite.runtime.Resources$ExInst.ex(Resources.java:572)
> ... 22 more
>
> I need help here:
> 1. could someone point my missing?
> 2 . if i want got the result like the mysql usage below , what should i change my code?
> mysql> select distinct(target_type),target_id,amt_pay,down_payment from rpt_tt group by target_type;
> +--------------+----------++-----------------------+
> |target_type|target_id|amt_pay|down_payment|
> +--------------+----------++-----------------------+
> |5 |1 | 1| 1|
> |6 |2 | 1| 1|
> |7 |3 | 1| 1|
> +--------------+----------++-----------------------+
> 3 rows in set (0.00 sec)
>
> many thanks ~~
>
>
>
--
This message was sent by Atlassian JIRA
(v7.6.3#76005)