You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@doris.apache.org by mo...@apache.org on 2023/04/21 06:05:03 UTC

[doris] 02/08: [fix](editLog) add sufficient replay logic and edit log for altering light schema change (#18746)

This is an automated email from the ASF dual-hosted git repository.

morningman pushed a commit to branch branch-2.0-alpha
in repository https://gitbox.apache.org/repos/asf/doris.git

commit de36bf3c074d602021f78c5803d5ca8edab861bb
Author: 奕冷 <82...@users.noreply.github.com>
AuthorDate: Thu Apr 20 19:20:03 2023 +0800

    [fix](editLog) add sufficient replay logic and edit log for altering light schema change (#18746)
---
 ...CHelper.java => AlterLightSchChangeHelper.java} | 52 ++++++++---------
 .../apache/doris/alter/SchemaChangeHandler.java    | 19 +++++-
 .../org/apache/doris/catalog/TableProperty.java    |  3 -
 .../org/apache/doris/journal/JournalEntity.java    |  7 ++-
 .../doris/persist/AlterLightSchemaChangeInfo.java  | 68 ++++++++++++++++++++++
 .../java/org/apache/doris/persist/EditLog.java     |  8 ++-
 6 files changed, 123 insertions(+), 34 deletions(-)

diff --git a/fe/fe-core/src/main/java/org/apache/doris/alter/AlterLSCHelper.java b/fe/fe-core/src/main/java/org/apache/doris/alter/AlterLightSchChangeHelper.java
similarity index 87%
rename from fe/fe-core/src/main/java/org/apache/doris/alter/AlterLSCHelper.java
rename to fe/fe-core/src/main/java/org/apache/doris/alter/AlterLightSchChangeHelper.java
index 1b808e10ea..d643aec58b 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/alter/AlterLSCHelper.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/alter/AlterLightSchChangeHelper.java
@@ -28,7 +28,7 @@ import org.apache.doris.catalog.Partition;
 import org.apache.doris.catalog.Tablet;
 import org.apache.doris.common.DdlException;
 import org.apache.doris.common.Pair;
-import org.apache.doris.persist.ModifyTablePropertyOperationLog;
+import org.apache.doris.persist.AlterLightSchemaChangeInfo;
 import org.apache.doris.proto.InternalService.PFetchColIdsRequest;
 import org.apache.doris.proto.InternalService.PFetchColIdsRequest.Builder;
 import org.apache.doris.proto.InternalService.PFetchColIdsRequest.PFetchColIdParam;
@@ -42,6 +42,8 @@ import org.apache.doris.thrift.TNetworkAddress;
 import org.apache.doris.thrift.TStatusCode;
 
 import com.google.common.base.Preconditions;
+import org.apache.log4j.LogManager;
+import org.apache.log4j.Logger;
 
 import java.io.IOException;
 import java.util.ArrayList;
@@ -59,13 +61,15 @@ import java.util.concurrent.TimeoutException;
 /**
  * For alter light_schema_change table property
  */
-public class AlterLSCHelper {
+public class AlterLightSchChangeHelper {
+
+    private static final Logger LOG = LogManager.getLogger(AlterLightSchChangeHelper.class);
 
     private final Database db;
 
     private final OlapTable olapTable;
 
-    public AlterLSCHelper(Database db, OlapTable olapTable) {
+    public AlterLightSchChangeHelper(Database db, OlapTable olapTable) {
         this.db = db;
         this.olapTable = olapTable;
     }
@@ -77,9 +81,10 @@ public class AlterLSCHelper {
      */
     public void enableLightSchemaChange() throws DdlException {
         final Map<Long, PFetchColIdsRequest> params = initParams();
-        final PFetchColIdsResponse response = callForColumnIds(params);
-        updateTableMeta(response);
-        modifyTableProperty();
+        final AlterLightSchemaChangeInfo info = callForColumnsInfo(params);
+        updateTableMeta(info);
+        Env.getCurrentEnv().getEditLog().logAlterLightSchemaChange(info);
+        LOG.info("successfully enable `light_schema_change`");
     }
 
     /**
@@ -133,10 +138,12 @@ public class AlterLSCHelper {
 
     /**
      * @param beIdToRequest rpc param for corresponding BEs
-     * @return indexIds to each tablet schema info which consists of columnName to corresponding column unique id pairs
+     * @return contains indexIds to each tablet schema info which consists of columnName to corresponding
+     * column unique id pairs
      * @throws DdlException as a wrapper for rpc failures
      */
-    private PFetchColIdsResponse callForColumnIds(Map<Long, PFetchColIdsRequest> beIdToRequest) throws DdlException {
+    private AlterLightSchemaChangeInfo callForColumnsInfo(Map<Long, PFetchColIdsRequest> beIdToRequest)
+            throws DdlException {
         final List<Future<PFetchColIdsResponse>> futureList = new ArrayList<>();
         // start a rpc in a pipeline way
         try {
@@ -173,14 +180,14 @@ public class AlterLSCHelper {
         } catch (TimeoutException e) {
             throw new DdlException("fetch columnIds RPC result timeout", e);
         }
-        return compactToUniqResp(resultList);
+        return compactToAlterLscInfo(resultList);
     }
 
     /**
      * Since the result collected from several BEs may contain repeated indexes in distributed storage scenarios,
      * we should do consistency check for the result for the same index, and get the unique result.
      */
-    private PFetchColIdsResponse compactToUniqResp(List<PFetchColIdsResponse> resultList) {
+    private AlterLightSchemaChangeInfo compactToAlterLscInfo(List<PFetchColIdsResponse> resultList) {
         final PFetchColIdsResponse.Builder builder = PFetchColIdsResponse.newBuilder();
         Map<Long, Map<String, Integer>> indexIdToTabletInfo = new HashMap<>();
         resultList.forEach(response -> {
@@ -197,27 +204,25 @@ public class AlterLSCHelper {
                         "index: " + indexId + "got inconsistent schema in storage");
             }
         });
-        return builder.build();
+        return new AlterLightSchemaChangeInfo(db.getId(), olapTable.getId(), indexIdToTabletInfo);
     }
 
-    private void updateTableMeta(PFetchColIdsResponse response) throws DdlException {
-        Preconditions.checkState(response.isInitialized());
+    public void updateTableMeta(AlterLightSchemaChangeInfo info) throws DdlException {
+        Preconditions.checkNotNull(info, "passed in info should be not null");
         // update index-meta once and for all
         // schema pair: <maxColId, columns>
         final List<Pair<Integer, List<Column>>> schemaPairs = new ArrayList<>();
         final List<Long> indexIds = new ArrayList<>();
-        response.getEntriesList().forEach(entry -> {
-            final long indexId = entry.getIndexId();
+        info.getIndexIdToColumnInfo().forEach((indexId, colNameToId) -> {
             final List<Column> columns = olapTable.getSchemaByIndexId(indexId, true);
-            final Map<String, Integer> colNameToId = entry.getColNameToIdMap();
             Preconditions.checkState(columns.size() == colNameToId.size(),
-                    "size mismatch for columns meta from BE");
+                    "size mismatch for original columns meta and that in change info");
             int maxColId = Column.COLUMN_UNIQUE_ID_INIT_VALUE;
             final List<Column> newSchema = new ArrayList<>();
             for (Column column : columns) {
                 final String columnName = column.getName();
                 final int columnId = Preconditions.checkNotNull(colNameToId.get(columnName),
-                        "failed to fetch column id of column:{" + columnName + "} from BE");
+                        "failed to fetch column id of column:{" + columnName + "}");
                 final Column newColumn = new Column(column);
                 newColumn.setUniqueId(columnId);
                 newSchema.add(newColumn);
@@ -226,7 +231,8 @@ public class AlterLSCHelper {
             schemaPairs.add(Pair.of(maxColId, newSchema));
             indexIds.add(indexId);
         });
-        Preconditions.checkState(schemaPairs.size() == indexIds.size());
+        Preconditions.checkState(schemaPairs.size() == indexIds.size(),
+                "impossible state, size of schemaPairs and indexIds should be the same");
         // update index-meta once and for all
         try {
             for (int i = 0; i < indexIds.size(); i++) {
@@ -238,14 +244,8 @@ public class AlterLSCHelper {
         } catch (IOException e) {
             throw new DdlException("fail to reset index schema", e);
         }
-    }
-
-    private void modifyTableProperty() {
         // write table property
         olapTable.setEnableLightSchemaChange(true);
-        //write edit log
-        ModifyTablePropertyOperationLog info = new ModifyTablePropertyOperationLog(db.getId(), olapTable.getId(),
-                olapTable.getTableProperty().getProperties());
-        Env.getCurrentEnv().getEditLog().logAlterLightSchemaChange(info);
+        LOG.info("successfully update table meta for `light_schema_change`");
     }
 }
diff --git a/fe/fe-core/src/main/java/org/apache/doris/alter/SchemaChangeHandler.java b/fe/fe-core/src/main/java/org/apache/doris/alter/SchemaChangeHandler.java
index ffce0bbb9b..c9ce361e3c 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/alter/SchemaChangeHandler.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/alter/SchemaChangeHandler.java
@@ -73,6 +73,7 @@ import org.apache.doris.common.util.ListComparator;
 import org.apache.doris.common.util.PropertyAnalyzer;
 import org.apache.doris.common.util.Util;
 import org.apache.doris.mysql.privilege.PrivPredicate;
+import org.apache.doris.persist.AlterLightSchemaChangeInfo;
 import org.apache.doris.persist.RemoveAlterJobV2OperationLog;
 import org.apache.doris.persist.TableAddOrDropColumnsInfo;
 import org.apache.doris.persist.TableAddOrDropInvertedIndicesInfo;
@@ -1890,8 +1891,22 @@ public class SchemaChangeHandler extends AlterHandler {
 
 
     private void enableLightSchemaChange(Database db, OlapTable olapTable) throws DdlException {
-        final AlterLSCHelper alterLSCHelper = new AlterLSCHelper(db, olapTable);
-        alterLSCHelper.enableLightSchemaChange();
+        final AlterLightSchChangeHelper alterLightSchChangeHelper = new AlterLightSchChangeHelper(db, olapTable);
+        alterLightSchChangeHelper.enableLightSchemaChange();
+    }
+
+    public void replayAlterLightSchChange(AlterLightSchemaChangeInfo info) throws MetaNotFoundException {
+        Database db = Env.getCurrentEnv().getInternalCatalog().getDbOrMetaException(info.getDbId());
+        OlapTable olapTable = (OlapTable) db.getTableOrMetaException(info.getTableId(), TableType.OLAP);
+        olapTable.writeLock();
+        final AlterLightSchChangeHelper alterLightSchChangeHelper = new AlterLightSchChangeHelper(db, olapTable);
+        try {
+            alterLightSchChangeHelper.updateTableMeta(info);
+        } catch (DdlException e) {
+            LOG.warn("failed to replay alter light schema change", e);
+        } finally {
+            olapTable.writeUnlock();
+        }
     }
 
     @Override
diff --git a/fe/fe-core/src/main/java/org/apache/doris/catalog/TableProperty.java b/fe/fe-core/src/main/java/org/apache/doris/catalog/TableProperty.java
index add5162d75..4078b7473d 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/catalog/TableProperty.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/catalog/TableProperty.java
@@ -107,9 +107,6 @@ public class TableProperty implements Writable {
                 buildInMemory();
                 buildStoragePolicy();
                 break;
-            case OperationType.OP_ALTER_LIGHT_SCHEMA_CHANGE:
-                buildEnableLightSchemaChange();
-                break;
             default:
                 break;
         }
diff --git a/fe/fe-core/src/main/java/org/apache/doris/journal/JournalEntity.java b/fe/fe-core/src/main/java/org/apache/doris/journal/JournalEntity.java
index ccbbd764a3..2162fc9702 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/journal/JournalEntity.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/journal/JournalEntity.java
@@ -61,6 +61,7 @@ import org.apache.doris.mtmv.metadata.DropMTMVTask;
 import org.apache.doris.mtmv.metadata.MTMVJob;
 import org.apache.doris.mtmv.metadata.MTMVTask;
 import org.apache.doris.mysql.privilege.UserPropertyInfo;
+import org.apache.doris.persist.AlterLightSchemaChangeInfo;
 import org.apache.doris.persist.AlterMultiMaterializedView;
 import org.apache.doris.persist.AlterRoutineLoadJobOperationLog;
 import org.apache.doris.persist.AlterUserOperationLog;
@@ -625,7 +626,6 @@ public class JournalEntity implements Writable {
             }
             case OperationType.OP_DYNAMIC_PARTITION:
             case OperationType.OP_MODIFY_IN_MEMORY:
-            case OperationType.OP_ALTER_LIGHT_SCHEMA_CHANGE:
             case OperationType.OP_MODIFY_REPLICATION_NUM: {
                 data = ModifyTablePropertyOperationLog.read(in);
                 isRead = true;
@@ -808,6 +808,11 @@ public class JournalEntity implements Writable {
                 isRead = true;
                 break;
             }
+            case OperationType.OP_ALTER_LIGHT_SCHEMA_CHANGE: {
+                data = AlterLightSchemaChangeInfo.read(in);
+                isRead = true;
+                break;
+            }
             default: {
                 IOException e = new IOException();
                 LOG.error("UNKNOWN Operation Type {}", opCode, e);
diff --git a/fe/fe-core/src/main/java/org/apache/doris/persist/AlterLightSchemaChangeInfo.java b/fe/fe-core/src/main/java/org/apache/doris/persist/AlterLightSchemaChangeInfo.java
new file mode 100644
index 0000000000..e76ebe9d64
--- /dev/null
+++ b/fe/fe-core/src/main/java/org/apache/doris/persist/AlterLightSchemaChangeInfo.java
@@ -0,0 +1,68 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.persist;
+
+import org.apache.doris.common.io.Text;
+import org.apache.doris.common.io.Writable;
+import org.apache.doris.persist.gson.GsonUtils;
+
+import com.google.gson.annotations.SerializedName;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+import java.util.Map;
+
+public class AlterLightSchemaChangeInfo implements Writable {
+
+    @SerializedName(value = "dbId")
+    private Long dbId;
+
+    @SerializedName(value = "tableId")
+    private Long tableId;
+
+    @SerializedName("indexIdToColumnInfo")
+    private Map<Long, Map<String, Integer>> indexIdToColumnInfo;
+
+    public AlterLightSchemaChangeInfo(long dbId, long tableId, Map<Long, Map<String, Integer>> info) {
+        this.dbId = dbId;
+        this.tableId = tableId;
+        this.indexIdToColumnInfo = info;
+    }
+
+    public Long getDbId() {
+        return dbId;
+    }
+
+    public Long getTableId() {
+        return tableId;
+    }
+
+    public Map<Long, Map<String, Integer>> getIndexIdToColumnInfo() {
+        return indexIdToColumnInfo;
+    }
+
+    @Override
+    public void write(DataOutput out) throws IOException {
+        Text.writeString(out, GsonUtils.GSON.toJson(this));
+    }
+
+    public static AlterLightSchemaChangeInfo read(DataInput in) throws IOException {
+        return GsonUtils.GSON.fromJson(Text.readString(in), AlterLightSchemaChangeInfo.class);
+    }
+}
diff --git a/fe/fe-core/src/main/java/org/apache/doris/persist/EditLog.java b/fe/fe-core/src/main/java/org/apache/doris/persist/EditLog.java
index fc837efd70..63d3f348aa 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/persist/EditLog.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/persist/EditLog.java
@@ -765,7 +765,6 @@ public class EditLog {
                 }
                 case OperationType.OP_DYNAMIC_PARTITION:
                 case OperationType.OP_MODIFY_IN_MEMORY:
-                case OperationType.OP_ALTER_LIGHT_SCHEMA_CHANGE:
                 case OperationType.OP_MODIFY_REPLICATION_NUM: {
                     ModifyTablePropertyOperationLog log = (ModifyTablePropertyOperationLog) journal.getData();
                     env.replayModifyTableProperty(opCode, log);
@@ -897,6 +896,11 @@ public class EditLog {
                     env.getSchemaChangeHandler().replayModifyTableLightSchemaChange(info);
                     break;
                 }
+                case OperationType.OP_ALTER_LIGHT_SCHEMA_CHANGE: {
+                    final AlterLightSchemaChangeInfo info = (AlterLightSchemaChangeInfo) journal.getData();
+                    env.getSchemaChangeHandler().replayAlterLightSchChange(info);
+                    break;
+                }
                 case OperationType.OP_MODIFY_TABLE_ADD_OR_DROP_INVERTED_INDICES: {
                     final TableAddOrDropInvertedIndicesInfo info =
                             (TableAddOrDropInvertedIndicesInfo) journal.getData();
@@ -1607,7 +1611,7 @@ public class EditLog {
         logEdit(OperationType.OP_MODIFY_IN_MEMORY, info);
     }
 
-    public void logAlterLightSchemaChange(ModifyTablePropertyOperationLog info) {
+    public void logAlterLightSchemaChange(AlterLightSchemaChangeInfo info) {
         logEdit(OperationType.OP_ALTER_LIGHT_SCHEMA_CHANGE, info);
     }
 


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@doris.apache.org
For additional commands, e-mail: commits-help@doris.apache.org