You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by sh...@apache.org on 2022/07/29 01:59:11 UTC
[flink] 02/04: [FLINK-28152][sql-gateway] Simplify SessionHandle and OperationHandle
This is an automated email from the ASF dual-hosted git repository.
shengkai pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git
commit 1fb5875ae6e2536266275c28e6260019706c28f2
Author: Shengkai <10...@qq.com>
AuthorDate: Sun Jul 24 14:09:25 2022 +0800
[FLINK-28152][sql-gateway] Simplify SessionHandle and OperationHandle
---
.../hive/util/ThriftObjectConversions.java | 30 +++++-----
.../flink/table/gateway/api/HandleIdentifier.java | 67 ----------------------
.../gateway/api/operation/OperationHandle.java | 9 ++-
.../table/gateway/api/session/SessionHandle.java | 9 ++-
4 files changed, 22 insertions(+), 93 deletions(-)
diff --git a/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/endpoint/hive/util/ThriftObjectConversions.java b/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/endpoint/hive/util/ThriftObjectConversions.java
index 228aa316758..49ece510191 100644
--- a/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/endpoint/hive/util/ThriftObjectConversions.java
+++ b/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/endpoint/hive/util/ThriftObjectConversions.java
@@ -18,7 +18,6 @@
package org.apache.flink.table.endpoint.hive.util;
-import org.apache.flink.table.gateway.api.HandleIdentifier;
import org.apache.flink.table.gateway.api.session.SessionHandle;
import org.apache.hive.service.rpc.thrift.THandleIdentifier;
@@ -34,12 +33,19 @@ import java.util.UUID;
/** Conversion between thrift object and flink object. */
public class ThriftObjectConversions {
+ private static final UUID SECRET_ID = UUID.fromString("b06fa16a-3d16-475f-b510-6c64abb9b173");
+
+ // --------------------------------------------------------------------------------------------
+ // Flink SessionHandle from/to Hive SessionHandle
+ // --------------------------------------------------------------------------------------------
+
public static TSessionHandle toTSessionHandle(SessionHandle sessionHandle) {
return new TSessionHandle(toTHandleIdentifier(sessionHandle.getIdentifier()));
}
public static SessionHandle toSessionHandle(TSessionHandle tSessionHandle) {
- return new SessionHandle(toHandleIdentifier(tSessionHandle.getSessionId()));
+ ByteBuffer bb = ByteBuffer.wrap(tSessionHandle.getSessionId().getGuid());
+ return new SessionHandle(new UUID(bb.getLong(), bb.getLong()));
}
public static TStatus toTStatus(Throwable t) {
@@ -51,27 +57,19 @@ public class ThriftObjectConversions {
// --------------------------------------------------------------------------------------------
- private static THandleIdentifier toTHandleIdentifier(HandleIdentifier identifier) {
+ private static THandleIdentifier toTHandleIdentifier(UUID publicId) {
byte[] guid = new byte[16];
byte[] secret = new byte[16];
ByteBuffer guidBB = ByteBuffer.wrap(guid);
ByteBuffer secretBB = ByteBuffer.wrap(secret);
- guidBB.putLong(identifier.getPublicId().getMostSignificantBits());
- guidBB.putLong(identifier.getPublicId().getLeastSignificantBits());
- secretBB.putLong(identifier.getSecretId().getMostSignificantBits());
- secretBB.putLong(identifier.getSecretId().getLeastSignificantBits());
+ guidBB.putLong(publicId.getMostSignificantBits());
+ guidBB.putLong(publicId.getLeastSignificantBits());
+ secretBB.putLong(SECRET_ID.getMostSignificantBits());
+ secretBB.putLong(SECRET_ID.getLeastSignificantBits());
return new THandleIdentifier(ByteBuffer.wrap(guid), ByteBuffer.wrap(secret));
}
- private static HandleIdentifier toHandleIdentifier(THandleIdentifier tHandleId) {
- ByteBuffer bb = ByteBuffer.wrap(tHandleId.getGuid());
- UUID publicId = new UUID(bb.getLong(), bb.getLong());
- bb = ByteBuffer.wrap(tHandleId.getSecret());
- UUID secretId = new UUID(bb.getLong(), bb.getLong());
- return new HandleIdentifier(publicId, secretId);
- }
-
/**
* Converts a {@link Throwable} object into a flattened list of texts including its stack trace
* and the stack traces of the nested causes.
@@ -103,7 +101,7 @@ public class ThriftObjectConversions {
}
private static List<String> enroll(Throwable ex, StackTraceElement[] trace, int max) {
- List<String> details = new ArrayList<String>();
+ List<String> details = new ArrayList<>();
StringBuilder builder = new StringBuilder();
builder.append('*').append(ex.getClass().getName()).append(':');
builder.append(ex.getMessage()).append(':');
diff --git a/flink-table/flink-sql-gateway-api/src/main/java/org/apache/flink/table/gateway/api/HandleIdentifier.java b/flink-table/flink-sql-gateway-api/src/main/java/org/apache/flink/table/gateway/api/HandleIdentifier.java
deleted file mode 100644
index cf6feb68013..00000000000
--- a/flink-table/flink-sql-gateway-api/src/main/java/org/apache/flink/table/gateway/api/HandleIdentifier.java
+++ /dev/null
@@ -1,67 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.table.gateway.api;
-
-import org.apache.flink.annotation.PublicEvolving;
-
-import java.util.Objects;
-import java.util.UUID;
-
-/** Identifiers for Handle. */
-@PublicEvolving
-public class HandleIdentifier {
-
- private final UUID publicId;
- private final UUID secretId;
-
- public HandleIdentifier(UUID publicId, UUID secretId) {
- this.publicId = publicId;
- this.secretId = secretId;
- }
-
- public UUID getPublicId() {
- return publicId;
- }
-
- public UUID getSecretId() {
- return secretId;
- }
-
- @Override
- public boolean equals(Object o) {
- if (this == o) {
- return true;
- }
- if (!(o instanceof HandleIdentifier)) {
- return false;
- }
- HandleIdentifier that = (HandleIdentifier) o;
- return Objects.equals(publicId, that.publicId) && Objects.equals(secretId, that.secretId);
- }
-
- @Override
- public int hashCode() {
- return Objects.hash(publicId, secretId);
- }
-
- @Override
- public String toString() {
- return publicId.toString();
- }
-}
diff --git a/flink-table/flink-sql-gateway-api/src/main/java/org/apache/flink/table/gateway/api/operation/OperationHandle.java b/flink-table/flink-sql-gateway-api/src/main/java/org/apache/flink/table/gateway/api/operation/OperationHandle.java
index dfdb3a6b173..fd63219d691 100644
--- a/flink-table/flink-sql-gateway-api/src/main/java/org/apache/flink/table/gateway/api/operation/OperationHandle.java
+++ b/flink-table/flink-sql-gateway-api/src/main/java/org/apache/flink/table/gateway/api/operation/OperationHandle.java
@@ -19,7 +19,6 @@
package org.apache.flink.table.gateway.api.operation;
import org.apache.flink.annotation.PublicEvolving;
-import org.apache.flink.table.gateway.api.HandleIdentifier;
import java.util.Objects;
import java.util.UUID;
@@ -28,17 +27,17 @@ import java.util.UUID;
@PublicEvolving
public class OperationHandle {
- private final HandleIdentifier identifier;
+ private final UUID identifier;
public static OperationHandle create() {
- return new OperationHandle(new HandleIdentifier(UUID.randomUUID(), UUID.randomUUID()));
+ return new OperationHandle(UUID.randomUUID());
}
- public OperationHandle(HandleIdentifier identifier) {
+ public OperationHandle(UUID identifier) {
this.identifier = identifier;
}
- public HandleIdentifier getIdentifier() {
+ public UUID getIdentifier() {
return identifier;
}
diff --git a/flink-table/flink-sql-gateway-api/src/main/java/org/apache/flink/table/gateway/api/session/SessionHandle.java b/flink-table/flink-sql-gateway-api/src/main/java/org/apache/flink/table/gateway/api/session/SessionHandle.java
index 4a5a2623044..19deabf3500 100644
--- a/flink-table/flink-sql-gateway-api/src/main/java/org/apache/flink/table/gateway/api/session/SessionHandle.java
+++ b/flink-table/flink-sql-gateway-api/src/main/java/org/apache/flink/table/gateway/api/session/SessionHandle.java
@@ -19,7 +19,6 @@
package org.apache.flink.table.gateway.api.session;
import org.apache.flink.annotation.PublicEvolving;
-import org.apache.flink.table.gateway.api.HandleIdentifier;
import java.util.Objects;
import java.util.UUID;
@@ -28,17 +27,17 @@ import java.util.UUID;
@PublicEvolving
public class SessionHandle {
- private final HandleIdentifier identifier;
+ private final UUID identifier;
public static SessionHandle create() {
- return new SessionHandle(new HandleIdentifier(UUID.randomUUID(), UUID.randomUUID()));
+ return new SessionHandle(UUID.randomUUID());
}
- public SessionHandle(HandleIdentifier identifier) {
+ public SessionHandle(UUID identifier) {
this.identifier = identifier;
}
- public HandleIdentifier getIdentifier() {
+ public UUID getIdentifier() {
return identifier;
}