You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by GitBox <gi...@apache.org> on 2022/07/25 14:31:26 UTC

[GitHub] [flink] wuchong commented on a diff in pull request #20298: [FLINK-28152][sql-gateway][hive] Allow executing statement for the HiveServer2Endpoint

wuchong commented on code in PR #20298:
URL: https://github.com/apache/flink/pull/20298#discussion_r928809112


##########
flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/endpoint/hive/HiveServer2Endpoint.java:
##########
@@ -196,7 +214,8 @@ public HiveServer2Endpoint(
             @Nullable String hiveConfPath,
             @Nullable String defaultDatabase,
             String moduleName,
-            boolean allowEmbedded) {
+            boolean allowEmbedded,
+            boolean isVerbose) {
         this.service = service;
 
         this.port = port;

Review Comment:
   The thrift bind host should also be able to configure, otherwise, the bound host is not deterministic when the node has multiple network cards. See `hive.server2.thrift.bind.host`.



##########
flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/endpoint/hive/util/ThriftObjectConversions.java:
##########
@@ -18,28 +18,238 @@
 
 package org.apache.flink.table.endpoint.hive.util;
 
-import org.apache.flink.table.gateway.api.HandleIdentifier;
+import org.apache.flink.annotation.VisibleForTesting;
+import org.apache.flink.table.catalog.Column;
+import org.apache.flink.table.catalog.ResolvedSchema;
+import org.apache.flink.table.catalog.hive.util.HiveTypeUtil;
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.gateway.api.operation.OperationHandle;
+import org.apache.flink.table.gateway.api.operation.OperationStatus;
+import org.apache.flink.table.gateway.api.operation.OperationType;
+import org.apache.flink.table.gateway.api.results.FetchOrientation;
 import org.apache.flink.table.gateway.api.session.SessionHandle;
+import org.apache.flink.table.types.DataType;
+import org.apache.flink.table.types.logical.CharType;
+import org.apache.flink.table.types.logical.DecimalType;
+import org.apache.flink.table.types.logical.LogicalType;
+import org.apache.flink.table.types.logical.VarCharType;
+import org.apache.flink.table.utils.print.RowDataToStringConverter;
+import org.apache.flink.types.RowKind;
 
+import org.apache.hadoop.hive.serde2.SerDeUtils;
+import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector;
+import org.apache.hadoop.hive.serde2.thrift.Type;
+import org.apache.hive.service.rpc.thrift.TBinaryColumn;
+import org.apache.hive.service.rpc.thrift.TBoolColumn;
+import org.apache.hive.service.rpc.thrift.TBoolValue;
+import org.apache.hive.service.rpc.thrift.TByteColumn;
+import org.apache.hive.service.rpc.thrift.TByteValue;
+import org.apache.hive.service.rpc.thrift.TCLIServiceConstants;
+import org.apache.hive.service.rpc.thrift.TColumn;
+import org.apache.hive.service.rpc.thrift.TColumnDesc;
+import org.apache.hive.service.rpc.thrift.TColumnValue;
+import org.apache.hive.service.rpc.thrift.TDoubleColumn;
+import org.apache.hive.service.rpc.thrift.TDoubleValue;
+import org.apache.hive.service.rpc.thrift.TFetchOrientation;
 import org.apache.hive.service.rpc.thrift.THandleIdentifier;
+import org.apache.hive.service.rpc.thrift.TI16Column;
+import org.apache.hive.service.rpc.thrift.TI16Value;
+import org.apache.hive.service.rpc.thrift.TI32Column;
+import org.apache.hive.service.rpc.thrift.TI32Value;
+import org.apache.hive.service.rpc.thrift.TI64Column;
+import org.apache.hive.service.rpc.thrift.TI64Value;
+import org.apache.hive.service.rpc.thrift.TOperationHandle;
+import org.apache.hive.service.rpc.thrift.TOperationState;
+import org.apache.hive.service.rpc.thrift.TOperationType;
+import org.apache.hive.service.rpc.thrift.TPrimitiveTypeEntry;
+import org.apache.hive.service.rpc.thrift.TProtocolVersion;
+import org.apache.hive.service.rpc.thrift.TRow;
+import org.apache.hive.service.rpc.thrift.TRowSet;
 import org.apache.hive.service.rpc.thrift.TSessionHandle;
 import org.apache.hive.service.rpc.thrift.TStatus;
 import org.apache.hive.service.rpc.thrift.TStatusCode;
+import org.apache.hive.service.rpc.thrift.TStringColumn;
+import org.apache.hive.service.rpc.thrift.TStringValue;
+import org.apache.hive.service.rpc.thrift.TTableSchema;
+import org.apache.hive.service.rpc.thrift.TTypeDesc;
+import org.apache.hive.service.rpc.thrift.TTypeEntry;
+import org.apache.hive.service.rpc.thrift.TTypeQualifierValue;
+import org.apache.hive.service.rpc.thrift.TTypeQualifiers;
 
 import java.nio.ByteBuffer;
 import java.util.ArrayList;
+import java.util.BitSet;
+import java.util.HashMap;
 import java.util.List;
+import java.util.Map;
 import java.util.UUID;
+import java.util.function.Function;
+import java.util.function.Supplier;
+import java.util.stream.Collectors;
+
+import static org.apache.flink.table.endpoint.hive.HiveServer2EndpointVersion.HIVE_CLI_SERVICE_PROTOCOL_V6;
 
 /** Conversion between thrift object and flink object. */
 public class ThriftObjectConversions {
 
+    private static final UUID SECRET_ID = UUID.fromString("b06fa16a-3d16-475f-b510-6c64abb9b173");
+
+    // --------------------------------------------------------------------------------------------
+    // Flink SessionHandle from/to Hive SessionHandle
+    // --------------------------------------------------------------------------------------------
+
     public static TSessionHandle toTSessionHandle(SessionHandle sessionHandle) {
-        return new TSessionHandle(toTHandleIdentifier(sessionHandle.getIdentifier()));
+        return new TSessionHandle(toTHandleIdentifier(sessionHandle.getIdentifier(), SECRET_ID));
     }
 
     public static SessionHandle toSessionHandle(TSessionHandle tSessionHandle) {
-        return new SessionHandle(toHandleIdentifier(tSessionHandle.getSessionId()));
+        ByteBuffer bb = ByteBuffer.wrap(tSessionHandle.getSessionId().getGuid());
+        return new SessionHandle(new UUID(bb.getLong(), bb.getLong()));
+    }
+
+    // --------------------------------------------------------------------------------------------
+    // Flink SessionHandle && OperationHandle from/to Hive OperationHandle
+    // --------------------------------------------------------------------------------------------
+
+    public static TOperationHandle toTOperationHandle(
+            SessionHandle sessionHandle,
+            OperationHandle operationHandle,
+            OperationType operationType,
+            boolean hasResult) {
+        return new TOperationHandle(
+                toTHandleIdentifier(sessionHandle.getIdentifier(), operationHandle.getIdentifier()),

Review Comment:
   1. Please add a comment on why we use session identifier as the publicId of Hive operation handle
   2. Maybe we should switch the order? session identifier as the secretId, and operation identifier as the publicId?  We use Flink identifier as Hive publicId in the session handle conversion. 



##########
flink-table/flink-sql-gateway-api/src/main/java/org/apache/flink/table/gateway/api/SqlGatewayService.java:
##########
@@ -108,6 +120,16 @@ void closeOperation(SessionHandle sessionHandle, OperationHandle operationHandle
     OperationInfo getOperationInfo(SessionHandle sessionHandle, OperationHandle operationHandle)
             throws SqlGatewayException;
 
+    /**
+     * Get the result schema for the specified Operation.
+     *
+     * @param sessionHandle handle to identify the session.
+     * @param operationHandle handle to identify the operation.
+     */
+    ResolvedSchema getOperationResultSchema(

Review Comment:
   Add a NOTE to warn users this method should be called when `getOperationInfo(..).getStatus() == FINISHED`. TBH, this sounds too restricted to me (what if this is a long query?). 
   
   Besides, does Hive driver client follow this contract? Otherwise, it may occur exceptions when running SELECT queries. 



##########
flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/endpoint/hive/util/ThriftObjectConversions.java:
##########
@@ -18,28 +18,238 @@
 
 package org.apache.flink.table.endpoint.hive.util;
 
-import org.apache.flink.table.gateway.api.HandleIdentifier;
+import org.apache.flink.annotation.VisibleForTesting;
+import org.apache.flink.table.catalog.Column;
+import org.apache.flink.table.catalog.ResolvedSchema;
+import org.apache.flink.table.catalog.hive.util.HiveTypeUtil;
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.gateway.api.operation.OperationHandle;
+import org.apache.flink.table.gateway.api.operation.OperationStatus;
+import org.apache.flink.table.gateway.api.operation.OperationType;
+import org.apache.flink.table.gateway.api.results.FetchOrientation;
 import org.apache.flink.table.gateway.api.session.SessionHandle;
+import org.apache.flink.table.types.DataType;
+import org.apache.flink.table.types.logical.CharType;
+import org.apache.flink.table.types.logical.DecimalType;
+import org.apache.flink.table.types.logical.LogicalType;
+import org.apache.flink.table.types.logical.VarCharType;
+import org.apache.flink.table.utils.print.RowDataToStringConverter;
+import org.apache.flink.types.RowKind;
 
+import org.apache.hadoop.hive.serde2.SerDeUtils;
+import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector;
+import org.apache.hadoop.hive.serde2.thrift.Type;
+import org.apache.hive.service.rpc.thrift.TBinaryColumn;
+import org.apache.hive.service.rpc.thrift.TBoolColumn;
+import org.apache.hive.service.rpc.thrift.TBoolValue;
+import org.apache.hive.service.rpc.thrift.TByteColumn;
+import org.apache.hive.service.rpc.thrift.TByteValue;
+import org.apache.hive.service.rpc.thrift.TCLIServiceConstants;
+import org.apache.hive.service.rpc.thrift.TColumn;
+import org.apache.hive.service.rpc.thrift.TColumnDesc;
+import org.apache.hive.service.rpc.thrift.TColumnValue;
+import org.apache.hive.service.rpc.thrift.TDoubleColumn;
+import org.apache.hive.service.rpc.thrift.TDoubleValue;
+import org.apache.hive.service.rpc.thrift.TFetchOrientation;
 import org.apache.hive.service.rpc.thrift.THandleIdentifier;
+import org.apache.hive.service.rpc.thrift.TI16Column;
+import org.apache.hive.service.rpc.thrift.TI16Value;
+import org.apache.hive.service.rpc.thrift.TI32Column;
+import org.apache.hive.service.rpc.thrift.TI32Value;
+import org.apache.hive.service.rpc.thrift.TI64Column;
+import org.apache.hive.service.rpc.thrift.TI64Value;
+import org.apache.hive.service.rpc.thrift.TOperationHandle;
+import org.apache.hive.service.rpc.thrift.TOperationState;
+import org.apache.hive.service.rpc.thrift.TOperationType;
+import org.apache.hive.service.rpc.thrift.TPrimitiveTypeEntry;
+import org.apache.hive.service.rpc.thrift.TProtocolVersion;
+import org.apache.hive.service.rpc.thrift.TRow;
+import org.apache.hive.service.rpc.thrift.TRowSet;
 import org.apache.hive.service.rpc.thrift.TSessionHandle;
 import org.apache.hive.service.rpc.thrift.TStatus;
 import org.apache.hive.service.rpc.thrift.TStatusCode;
+import org.apache.hive.service.rpc.thrift.TStringColumn;
+import org.apache.hive.service.rpc.thrift.TStringValue;
+import org.apache.hive.service.rpc.thrift.TTableSchema;
+import org.apache.hive.service.rpc.thrift.TTypeDesc;
+import org.apache.hive.service.rpc.thrift.TTypeEntry;
+import org.apache.hive.service.rpc.thrift.TTypeQualifierValue;
+import org.apache.hive.service.rpc.thrift.TTypeQualifiers;
 
 import java.nio.ByteBuffer;
 import java.util.ArrayList;
+import java.util.BitSet;
+import java.util.HashMap;
 import java.util.List;
+import java.util.Map;
 import java.util.UUID;
+import java.util.function.Function;
+import java.util.function.Supplier;
+import java.util.stream.Collectors;
+
+import static org.apache.flink.table.endpoint.hive.HiveServer2EndpointVersion.HIVE_CLI_SERVICE_PROTOCOL_V6;
 
 /** Conversion between thrift object and flink object. */
 public class ThriftObjectConversions {
 
+    private static final UUID SECRET_ID = UUID.fromString("b06fa16a-3d16-475f-b510-6c64abb9b173");
+
+    // --------------------------------------------------------------------------------------------
+    // Flink SessionHandle from/to Hive SessionHandle
+    // --------------------------------------------------------------------------------------------
+
     public static TSessionHandle toTSessionHandle(SessionHandle sessionHandle) {
-        return new TSessionHandle(toTHandleIdentifier(sessionHandle.getIdentifier()));
+        return new TSessionHandle(toTHandleIdentifier(sessionHandle.getIdentifier(), SECRET_ID));
     }
 
     public static SessionHandle toSessionHandle(TSessionHandle tSessionHandle) {
-        return new SessionHandle(toHandleIdentifier(tSessionHandle.getSessionId()));
+        ByteBuffer bb = ByteBuffer.wrap(tSessionHandle.getSessionId().getGuid());
+        return new SessionHandle(new UUID(bb.getLong(), bb.getLong()));
+    }
+
+    // --------------------------------------------------------------------------------------------
+    // Flink SessionHandle && OperationHandle from/to Hive OperationHandle
+    // --------------------------------------------------------------------------------------------
+
+    public static TOperationHandle toTOperationHandle(
+            SessionHandle sessionHandle,
+            OperationHandle operationHandle,
+            OperationType operationType,
+            boolean hasResult) {
+        return new TOperationHandle(
+                toTHandleIdentifier(sessionHandle.getIdentifier(), operationHandle.getIdentifier()),
+                toTOperationType(operationType),
+                hasResult);
+    }
+
+    public static SessionHandle toSessionHandle(TOperationHandle tOperationHandle) {
+        ByteBuffer bb = ByteBuffer.wrap(tOperationHandle.getOperationId().getGuid());
+        return new SessionHandle(new UUID(bb.getLong(), bb.getLong()));
+    }
+
+    public static OperationHandle toOperationHandle(TOperationHandle tOperationHandle) {
+        ByteBuffer bb = ByteBuffer.wrap(tOperationHandle.getOperationId().getSecret());
+        return new OperationHandle(new UUID(bb.getLong(), bb.getLong()));
+    }
+
+    // --------------------------------------------------------------------------------------------
+    // Operation related conversions
+    // --------------------------------------------------------------------------------------------
+
+    public static TOperationType toTOperationType(OperationType type) {
+        switch (type) {
+            case EXECUTE_STATEMENT:
+                return TOperationType.EXECUTE_STATEMENT;
+            case UNKNOWN:
+                return TOperationType.UNKNOWN;
+            default:
+                throw new IllegalArgumentException(
+                        String.format("Unknown operation type: %s.", type));
+        }
+    }
+
+    public static TOperationState toTOperationState(OperationStatus operationStatus) {
+        switch (operationStatus) {
+            case INITIALIZED:
+                return TOperationState.INITIALIZED_STATE;
+            case PENDING:
+                return TOperationState.PENDING_STATE;
+            case RUNNING:
+                return TOperationState.RUNNING_STATE;
+            case FINISHED:
+                return TOperationState.FINISHED_STATE;
+            case ERROR:
+                return TOperationState.ERROR_STATE;
+            case TIMEOUT:
+                return TOperationState.TIMEDOUT_STATE;
+            case CANCELED:
+                return TOperationState.CANCELED_STATE;
+            case CLOSED:
+                return TOperationState.CLOSED_STATE;
+            default:
+                throw new IllegalArgumentException(
+                        String.format("Unknown operation status: %s.", operationStatus));
+        }
+    }
+
+    // --------------------------------------------------------------------------------------------
+    // Statement related conversions
+    // --------------------------------------------------------------------------------------------
+
+    public static FetchOrientation toFetchOrientation(int fetchOrientation) {
+        if (fetchOrientation == TFetchOrientation.FETCH_NEXT.getValue()) {
+            return FetchOrientation.FETCH_NEXT;
+        } else if (fetchOrientation == TFetchOrientation.FETCH_PRIOR.getValue()) {
+            return FetchOrientation.FETCH_PRIOR;
+        } else {
+            throw new UnsupportedOperationException(
+                    String.format("Unsupported fetch orientation: %s.", fetchOrientation));
+        }
+    }
+
+    /** Similar logic in the {@code org.apache.hive.service.cli.ColumnDescriptor}. */
+    public static TTableSchema toTTableSchema(ResolvedSchema schema) {
+        TTableSchema tSchema = new TTableSchema();
+
+        for (int i = 0; i < schema.getColumnCount(); i++) {
+            Column column = schema.getColumns().get(i);
+            TColumnDesc desc = new TColumnDesc();
+            desc.setColumnName(column.getName());
+            column.getComment().ifPresent(desc::setComment);
+            desc.setPosition(i);
+
+            TTypeDesc typeDesc = new TTypeDesc();
+
+            // Hive uses the TPrimitiveTypeEntry only. Please refer to TypeDescriptor#toTTypeDesc.
+            DataType columnType = column.getDataType();
+            TPrimitiveTypeEntry typeEntry =
+                    new TPrimitiveTypeEntry(

Review Comment:
   Does this mean we only support mapping primitive types? Do not support complex types? 



##########
flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/table/endpoint/hive/util/ThriftObjectConversionsTest.java:
##########
@@ -34,4 +110,250 @@ public void testConvertSessionHandle() {
         SessionHandle originSessionHandle = SessionHandle.create();
         assertEquals(toSessionHandle(toTSessionHandle(originSessionHandle)), originSessionHandle);
     }
+
+    @Test
+    public void testConvertSessionHandleAndOperationHandle() {
+        SessionHandle originSessionHandle = SessionHandle.create();
+        OperationHandle originOperationHandle = OperationHandle.create();
+        TOperationHandle tOperationHandle =
+                toTOperationHandle(
+                        originSessionHandle, originOperationHandle, OperationType.UNKNOWN, false);
+
+        assertEquals(toSessionHandle(tOperationHandle), originSessionHandle);
+        assertEquals(toOperationHandle(tOperationHandle), originOperationHandle);
+    }
+
+    @Test
+    public void testConvertOperationStatus() {
+        Map<OperationStatus, TOperationState> expectedMappings = new HashMap<>();
+        expectedMappings.put(INITIALIZED, TOperationState.INITIALIZED_STATE);
+        expectedMappings.put(PENDING, TOperationState.PENDING_STATE);
+        expectedMappings.put(RUNNING, TOperationState.RUNNING_STATE);
+        expectedMappings.put(FINISHED, TOperationState.FINISHED_STATE);
+        expectedMappings.put(CANCELED, TOperationState.CANCELED_STATE);
+        expectedMappings.put(CLOSED, TOperationState.CLOSED_STATE);
+        expectedMappings.put(ERROR, TOperationState.ERROR_STATE);
+        expectedMappings.put(TIMEOUT, TOperationState.TIMEDOUT_STATE);
+
+        for (OperationStatus status : expectedMappings.keySet()) {
+            assertEquals(expectedMappings.get(status), toTOperationState(status));
+        }
+    }
+
+    @Test
+    public void testToTTableSchema() {

Review Comment:
   ?



##########
flink-table/flink-sql-gateway/src/main/java/org/apache/flink/table/gateway/service/operation/OperationManager.java:
##########
@@ -290,12 +309,34 @@ public void close() {
         }
 
         public ResultSet fetchResults(long token, int maxRows) {
+            return fetchResultsInternal(() -> resultFetcher.fetchResults(token, maxRows));
+        }
+
+        public ResultSet fetchResults(FetchOrientation orientation, int maxRows) {
+            return fetchResultsInternal(() -> resultFetcher.fetchResults(orientation, maxRows));
+        }
+
+        public ResolvedSchema getResultSchema() {
+            OperationStatus current = status.get();
+            if (current != OperationStatus.FINISHED || !hasResults) {

Review Comment:
   `hasResults` is always `true`?



##########
flink-table/flink-table-common/src/main/java/org/apache/flink/table/utils/print/RowDataToStringConverter.java:
##########
@@ -24,5 +24,15 @@
 /** Interface to convert {@link RowData} to {@link String} using the SQL casting rules. */
 @Internal
 public interface RowDataToStringConverter {
-    String[] convert(RowData rowData);
+
+    default String[] convert(RowData rowData) {
+        int arity = rowData.getArity();
+        String[] converted = new String[arity];
+        for (int i = 0; i < arity; i++) {
+            converted[i] = convert(rowData, i);
+        }
+        return converted;
+    }
+
+    String convert(RowData rowData, int index);

Review Comment:
   1. Add comments to the methods. 
   2. convert -> convertColumn.



##########
flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/endpoint/hive/util/ThriftObjectConversions.java:
##########
@@ -51,25 +261,250 @@ public static TStatus toTStatus(Throwable t) {
 
     // --------------------------------------------------------------------------------------------
 
-    private static THandleIdentifier toTHandleIdentifier(HandleIdentifier identifier) {
+    private static THandleIdentifier toTHandleIdentifier(UUID publicId, UUID secretId) {
         byte[] guid = new byte[16];
         byte[] secret = new byte[16];
         ByteBuffer guidBB = ByteBuffer.wrap(guid);
         ByteBuffer secretBB = ByteBuffer.wrap(secret);
 
-        guidBB.putLong(identifier.getPublicId().getMostSignificantBits());
-        guidBB.putLong(identifier.getPublicId().getLeastSignificantBits());
-        secretBB.putLong(identifier.getSecretId().getMostSignificantBits());
-        secretBB.putLong(identifier.getSecretId().getLeastSignificantBits());
+        guidBB.putLong(publicId.getMostSignificantBits());
+        guidBB.putLong(publicId.getLeastSignificantBits());
+        secretBB.putLong(secretId.getMostSignificantBits());
+        secretBB.putLong(secretId.getLeastSignificantBits());
         return new THandleIdentifier(ByteBuffer.wrap(guid), ByteBuffer.wrap(secret));
     }
 
-    private static HandleIdentifier toHandleIdentifier(THandleIdentifier tHandleId) {
-        ByteBuffer bb = ByteBuffer.wrap(tHandleId.getGuid());
-        UUID publicId = new UUID(bb.getLong(), bb.getLong());
-        bb = ByteBuffer.wrap(tHandleId.getSecret());
-        UUID secretId = new UUID(bb.getLong(), bb.getLong());
-        return new HandleIdentifier(publicId, secretId);
+    @VisibleForTesting
+    public static TRowSet toColumnBasedSet(
+            List<LogicalType> fieldTypes,
+            List<RowData.FieldGetter> fieldGetters,
+            RowDataToStringConverter converter,
+            List<RowData> rows) {
+        int rowNum = rows.size();
+        // TODO: Support accurate start offset
+        TRowSet rowSet = new TRowSet(0, new ArrayList<>(rowNum));
+        for (int i = 0; i < fieldTypes.size(); i++) {
+            int index = i;
+            rowSet.addToColumns(
+                    toTColumn(
+                            fieldTypes.get(i),
+                            fieldGetters.get(i),
+                            row -> row.isNullAt(index),
+                            row -> converter.convert(row, index),
+                            rows));
+        }
+        return rowSet;
+    }
+
+    private static TColumn toTColumn(
+            LogicalType fieldType,
+            RowData.FieldGetter fieldGetter,
+            Function<RowData, Boolean> isNull,
+            Function<RowData, String> stringifiedValue,
+            List<RowData> rows) {
+        BitSet nulls = new BitSet();
+        switch (fieldType.getTypeRoot()) {

Review Comment:
   According to https://github.com/apache/hive/blame/master/service-rpc/if/TCLIService.thrift#L405, we should map TIMESTAMP, TIMESTAMP_LTZ to int64 as well?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org