You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@phoenix.apache.org by vincentpoon <gi...@git.apache.org> on 2017/08/18 00:56:55 UTC

[GitHub] phoenix pull request #269: PHOENIX-2460 Implement scrutiny command to valida...

GitHub user vincentpoon opened a pull request:

    https://github.com/apache/phoenix/pull/269

    PHOENIX-2460 Implement scrutiny command to validate whether or not an…

    … index is in sync with the data table

You can merge this pull request into a Git repository by running:

    $ git pull https://github.com/vincentpoon/phoenix PHOENIX-2460

Alternatively you can review and apply these changes as the patch at:

    https://github.com/apache/phoenix/pull/269.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

    This closes #269
    
----
commit 7fab3f1ea6cc6829b107f358fc6faf5f6e0afa83
Author: Vincent <vi...@gmail.com>
Date:   2017-08-18T00:46:41Z

    PHOENIX-2460 Implement scrutiny command to validate whether or not an index is in sync with the data table

----


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] phoenix pull request #269: PHOENIX-2460 Implement scrutiny command to valida...

Posted by JamesRTaylor <gi...@git.apache.org>.
Github user JamesRTaylor commented on a diff in the pull request:

    https://github.com/apache/phoenix/pull/269#discussion_r134631925
  
    --- Diff: phoenix-core/src/test/java/org/apache/phoenix/mapreduce/index/TestIndexScrutinyTableOutput.java ---
    @@ -0,0 +1,93 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more contributor license
    + * agreements. See the NOTICE file distributed with this work for additional information regarding
    + * copyright ownership. The ASF licenses this file to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance with the License. You may obtain a
    + * copy of the License at http://www.apache.org/licenses/LICENSE-2.0 Unless required by applicable
    + * law or agreed to in writing, software distributed under the License is distributed on an "AS IS"
    + * BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License
    + * for the specific language governing permissions and limitations under the License.
    + */
    +package org.apache.phoenix.mapreduce.index;
    +
    +import static org.junit.Assert.assertEquals;
    +
    +import java.sql.SQLException;
    +import java.util.Arrays;
    +
    +import org.apache.phoenix.mapreduce.util.IndexColumnNames;
    +import org.junit.Before;
    +import org.junit.Test;
    +
    +public class TestIndexScrutinyTableOutput extends BaseIndexTest {
    +
    +    private static final long SCRUTINY_TIME_MILLIS = 1502908914193L;
    +
    +    @Before
    +    public void setup() throws Exception {
    +        super.setup();
    +        conn.createStatement().execute(IndexScrutinyTableOutput.OUTPUT_TABLE_DDL);
    +        conn.createStatement().execute(IndexScrutinyTableOutput.OUTPUT_METADATA_DDL);
    +        conn.commit();
    --- End diff --
    
    Minor nit: commit() not necessary after DDL


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] phoenix pull request #269: PHOENIX-2460 Implement scrutiny command to valida...

Posted by JamesRTaylor <gi...@git.apache.org>.
Github user JamesRTaylor commented on a diff in the pull request:

    https://github.com/apache/phoenix/pull/269#discussion_r134632114
  
    --- Diff: phoenix-core/src/test/java/org/apache/phoenix/mapreduce/util/TestIndexColumnNames.java ---
    @@ -0,0 +1,74 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + * http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +package org.apache.phoenix.mapreduce.util;
    +
    +import static org.junit.Assert.assertEquals;
    +
    +import java.sql.SQLException;
    +
    +import org.apache.phoenix.jdbc.PhoenixConnection;
    +import org.apache.phoenix.mapreduce.index.BaseIndexTest;
    +import org.apache.phoenix.parse.HintNode.Hint;
    +import org.apache.phoenix.schema.PTableKey;
    +import org.apache.phoenix.util.QueryUtil;
    +import org.junit.Test;
    +
    +public class TestIndexColumnNames extends BaseIndexTest {
    --- End diff --
    
    Same here for name: IndexColumnNamesTest


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] phoenix issue #269: PHOENIX-2460 Implement scrutiny command to validate whet...

Posted by vincentpoon <gi...@git.apache.org>.
Github user vincentpoon commented on the issue:

    https://github.com/apache/phoenix/pull/269
  
    @JamesRTaylor thanks for the comments, I now also write the timestamp from the cell to the output table when there's a mismatch.  Also fixed the nits and use md5.  Ran fine on a cluster with 800m rows.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] phoenix pull request #269: PHOENIX-2460 Implement scrutiny command to valida...

Posted by JamesRTaylor <gi...@git.apache.org>.
Github user JamesRTaylor commented on a diff in the pull request:

    https://github.com/apache/phoenix/pull/269#discussion_r134632028
  
    --- Diff: phoenix-core/src/test/java/org/apache/phoenix/mapreduce/index/TestIndexScrutinyTableOutput.java ---
    @@ -0,0 +1,93 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more contributor license
    + * agreements. See the NOTICE file distributed with this work for additional information regarding
    + * copyright ownership. The ASF licenses this file to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance with the License. You may obtain a
    + * copy of the License at http://www.apache.org/licenses/LICENSE-2.0 Unless required by applicable
    + * law or agreed to in writing, software distributed under the License is distributed on an "AS IS"
    + * BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License
    + * for the specific language governing permissions and limitations under the License.
    + */
    +package org.apache.phoenix.mapreduce.index;
    +
    +import static org.junit.Assert.assertEquals;
    +
    +import java.sql.SQLException;
    +import java.util.Arrays;
    +
    +import org.apache.phoenix.mapreduce.util.IndexColumnNames;
    +import org.junit.Before;
    +import org.junit.Test;
    +
    +public class TestIndexScrutinyTableOutput extends BaseIndexTest {
    --- End diff --
    
    Minor nit: Phoenix naming convention (with a few exceptions) is to post fix class with "Test": IndexScrutinyTableOutputTest


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] phoenix pull request #269: PHOENIX-2460 Implement scrutiny command to valida...

Posted by JamesRTaylor <gi...@git.apache.org>.
Github user JamesRTaylor commented on a diff in the pull request:

    https://github.com/apache/phoenix/pull/269#discussion_r134624315
  
    --- Diff: phoenix-core/src/main/java/org/apache/phoenix/mapreduce/index/IndexScrutinyMapper.java ---
    @@ -0,0 +1,349 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + * http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +package org.apache.phoenix.mapreduce.index;
    +
    +import java.io.IOException;
    +import java.sql.Connection;
    +import java.sql.PreparedStatement;
    +import java.sql.ResultSet;
    +import java.sql.SQLException;
    +import java.util.ArrayList;
    +import java.util.Arrays;
    +import java.util.HashMap;
    +import java.util.List;
    +import java.util.Map;
    +import java.util.Objects;
    +import java.util.Properties;
    +
    +import org.apache.hadoop.conf.Configuration;
    +import org.apache.hadoop.io.NullWritable;
    +import org.apache.hadoop.io.Text;
    +import org.apache.hadoop.mapreduce.Mapper;
    +import org.apache.phoenix.mapreduce.PhoenixJobCounters;
    +import org.apache.phoenix.mapreduce.index.IndexScrutinyTool.OutputFormat;
    +import org.apache.phoenix.mapreduce.index.IndexScrutinyTool.SourceTable;
    +import org.apache.phoenix.mapreduce.util.ConnectionUtil;
    +import org.apache.phoenix.mapreduce.util.PhoenixConfigurationUtil;
    +import org.apache.phoenix.parse.HintNode.Hint;
    +import org.apache.phoenix.schema.PTable;
    +import org.apache.phoenix.util.ColumnInfo;
    +import org.apache.phoenix.util.PhoenixRuntime;
    +import org.apache.phoenix.util.QueryUtil;
    +import org.apache.phoenix.util.SchemaUtil;
    +import org.slf4j.Logger;
    +import org.slf4j.LoggerFactory;
    +
    +import com.google.common.base.Joiner;
    +
    +/**
    + * Mapper that reads from the data table and checks the rows against the index table
    + */
    +public class IndexScrutinyMapper extends Mapper<NullWritable, PhoenixIndexDBWritable, Text, Text> {
    +
    +    private static final Logger LOG = LoggerFactory.getLogger(IndexScrutinyMapper.class);
    +    private Connection connection;
    +    private List<ColumnInfo> targetTblColumnMetadata;
    +    private long batchSize;
    +    // holds a batch of rows from the table the mapper is iterating over
    +    private List<List<Object>> currentBatchValues = new ArrayList<>();
    +    private String targetTableQuery;
    +    private int numTargetPkCols;
    +    private boolean outputInvalidRows;
    +    private OutputFormat outputFormat = OutputFormat.FILE;
    +    private String qSourceTable;
    +    private String qTargetTable;
    +    private long executeTimestamp;
    +    private int numSourcePkCols;
    +    private final PhoenixIndexDBWritable indxWritable = new PhoenixIndexDBWritable();
    +    private List<ColumnInfo> sourceTblColumnMetadata;
    +
    +    // used to write results to the output table
    +    private Connection outputConn;
    +    private PreparedStatement outputUpsertStmt;
    +    private long outputMaxRows;
    +
    +    @Override
    +    protected void setup(final Context context) throws IOException, InterruptedException {
    +        super.setup(context);
    +        final Configuration configuration = context.getConfiguration();
    +        try {
    +            // get a connection with correct CURRENT_SCN (so incoming writes don't throw off the
    +            // scrutiny)
    +            final Properties overrideProps = new Properties();
    +            String scn = configuration.get(PhoenixConfigurationUtil.CURRENT_SCN_VALUE);
    +            overrideProps.put(PhoenixRuntime.CURRENT_SCN_ATTRIB, scn);
    +            connection = ConnectionUtil.getOutputConnection(configuration, overrideProps);
    +            connection.setAutoCommit(false);
    +            batchSize = PhoenixConfigurationUtil.getScrutinyBatchSize(configuration);
    +            outputInvalidRows =
    +                    PhoenixConfigurationUtil.getScrutinyOutputInvalidRows(configuration);
    +            outputFormat = PhoenixConfigurationUtil.getScrutinyOutputFormat(configuration);
    +            executeTimestamp = PhoenixConfigurationUtil.getScrutinyExecuteTimestamp(configuration);
    +
    +            // get the index table and column names
    +            String qDataTable = PhoenixConfigurationUtil.getScrutinyDataTableName(configuration);
    +            final PTable pdataTable = PhoenixRuntime.getTable(connection, qDataTable);
    +            final String qIndexTable =
    +                    PhoenixConfigurationUtil.getScrutinyIndexTableName(configuration);
    +            final PTable pindexTable = PhoenixRuntime.getTable(connection, qIndexTable);
    +
    +            // set the target table based on whether we're running the MR over the data or index
    +            // table
    +            SourceTable sourceTable =
    +                    PhoenixConfigurationUtil.getScrutinySourceTable(configuration);
    +            SourceTargetColumnNames columnNames =
    +                    SourceTable.DATA_TABLE_SOURCE.equals(sourceTable)
    +                            ? new SourceTargetColumnNames.DataSourceColNames(pdataTable,
    +                                    pindexTable)
    +                            : new SourceTargetColumnNames.IndexSourceColNames(pdataTable,
    +                                    pindexTable);
    +            qSourceTable = columnNames.getQualifiedSourceTableName();
    +            qTargetTable = columnNames.getQualifiedTargetTableName();
    +            List<String> targetColNames = columnNames.getTargetColNames();
    +            List<String> sourceColNames = columnNames.getSourceColNames();
    +            List<String> targetPkColNames = columnNames.getTargetPkColNames();
    +            String targetPksCsv =
    +                    Joiner.on(",").join(SchemaUtil.getEscapedFullColumnNames(targetPkColNames));
    +            numSourcePkCols = columnNames.getSourcePkColNames().size();
    +            numTargetPkCols = targetPkColNames.size();
    +
    +            if (OutputFormat.TABLE.equals(outputFormat)) {
    +                outputConn = ConnectionUtil.getOutputConnection(configuration, new Properties());
    +                String upsertQuery = PhoenixConfigurationUtil.getUpsertStatement(configuration);
    +                this.outputUpsertStmt = outputConn.prepareStatement(upsertQuery);
    +            }
    +            outputMaxRows = PhoenixConfigurationUtil.getScrutinyOutputMax(configuration);
    +
    +            // Create the query against the target table
    +            // Our query projection should be all the index column names (or their data table
    +            // equivalent
    +            // name)
    +            targetTableQuery =
    +                    QueryUtil.constructSelectStatement(qTargetTable, columnNames.getCastedTargetColNames(), targetPksCsv,
    +                        Hint.NO_INDEX, false) + " IN ";
    +            targetTblColumnMetadata =
    +                    PhoenixRuntime.generateColumnInfo(connection, qTargetTable, targetColNames);
    +            sourceTblColumnMetadata =
    +                    PhoenixRuntime.generateColumnInfo(connection, qSourceTable, sourceColNames);
    +            LOG.info("Base query against target table: " + targetTableQuery);
    +        } catch (SQLException e) {
    +            throw new RuntimeException(e);
    +        }
    +    }
    +
    +    @Override
    +    protected void map(NullWritable key, PhoenixIndexDBWritable record, Context context)
    +            throws IOException, InterruptedException {
    +        try {
    +            final List<Object> values = record.getValues();
    +
    +            context.getCounter(PhoenixJobCounters.INPUT_RECORDS).increment(1);
    +            currentBatchValues.add(values);
    +            if (context.getCounter(PhoenixJobCounters.INPUT_RECORDS).getValue() % batchSize != 0) {
    +                // if we haven't hit the batch size, just report progress and move on to next record
    +                context.progress();
    +                return;
    +            } else {
    +                // otherwise, process the batch
    +                processBatch(context);
    +            }
    +            context.progress(); // Make sure progress is reported to Application Master.
    +        } catch (SQLException | IllegalArgumentException e) {
    +            LOG.error(" Error while read/write of a record ", e);
    +            context.getCounter(PhoenixJobCounters.FAILED_RECORDS).increment(1);
    +            throw new IOException(e);
    +        }
    +    }
    +
    +    @Override
    +    protected void cleanup(Context context) throws IOException, InterruptedException {
    +        super.cleanup(context);
    +        if (connection != null) {
    +            try {
    +                processBatch(context);
    +                connection.close();
    +                if (outputConn != null) {
    +                    outputConn.close();
    +                }
    +            } catch (SQLException e) {
    +                LOG.error("Error while closing connection in the PhoenixIndexMapper class ", e);
    +                throw new IOException(e);
    +            }
    +        }
    +    }
    +
    +    private void processBatch(Context context)
    +            throws SQLException, IOException, InterruptedException {
    +        if (currentBatchValues.size() == 0) return;
    +        context.getCounter(PhoenixScrutinyJobCounters.BATCHES_PROCESSED_COUNT).increment(1);
    +        // our query selection filter should be the PK columns of the target table (index or data
    +        // table)
    +        String inClause =
    +                QueryUtil.constructParameterizedInClause(numTargetPkCols,
    +                    currentBatchValues.size());
    +        String indexQuery = targetTableQuery + inClause;
    +        PreparedStatement targetStatement = connection.prepareStatement(indexQuery);
    +
    +        // while we build the PreparedStatement, we also maintain a hash of the target table PKs,
    +        // which we use to join against the results of the query on the target table
    +        Map<Integer, List<Object>> targetPkToSourceValues = buildTargetStatement(targetStatement);
    +
    +        // fetch results from the target table and output invalid rows
    +        queryTargetTable(context, targetStatement, targetPkToSourceValues);
    +
    +        // any source values we have left over are invalid (e.g. data table rows without
    +        // corresponding index row)
    +        context.getCounter(PhoenixScrutinyJobCounters.INVALID_ROW_COUNT)
    +                .increment(targetPkToSourceValues.size());
    +        if (outputInvalidRows) {
    +            for (List<Object> sourceRowWithoutTargetRow : targetPkToSourceValues.values()) {
    +                if (OutputFormat.FILE.equals(outputFormat)) {
    +                    context.write(new Text(Arrays.toString(sourceRowWithoutTargetRow.toArray())),
    +                        new Text("Target row not found"));
    +                } else if (OutputFormat.TABLE.equals(outputFormat)) {
    +                    writeToOutputTable(context, sourceRowWithoutTargetRow, null);
    +                }
    +            }
    +        }
    +        if (outputInvalidRows && OutputFormat.TABLE.equals(outputFormat)) {
    +            outputUpsertStmt.executeBatch(); // write out invalid rows to output table
    +            outputConn.commit();
    +        }
    +        currentBatchValues.clear();
    +    }
    +
    +    private Map<Integer, List<Object>> buildTargetStatement(PreparedStatement targetStatement)
    +            throws SQLException {
    +        Map<Integer, List<Object>> targetPkToSourceValues =
    +                new HashMap<>(currentBatchValues.size());
    +        int rsIndex = 1;
    +        for (List<Object> batchRow : currentBatchValues) {
    +            // our original query against the source table (which provided the batchRow) projected
    +            // with the data table PK cols first, so the first numTargetPkCols form the PK
    +            int targetPkHash = getPkHash(batchRow.subList(0, numTargetPkCols));
    +            targetPkToSourceValues.put(targetPkHash, batchRow);
    +            for (int i = 0; i < numTargetPkCols; i++) {
    +                ColumnInfo targetPkInfo = targetTblColumnMetadata.get(i);
    +                Object value = batchRow.get(i);
    +                if (value == null) {
    +                    targetStatement.setNull(rsIndex++, targetPkInfo.getSqlType());
    +                } else {
    +                    targetStatement.setObject(rsIndex++, value, targetPkInfo.getSqlType());
    +                }
    +            }
    +        }
    +        return targetPkToSourceValues;
    +    }
    +
    +    private void queryTargetTable(Context context, PreparedStatement targetStatement,
    +            Map<Integer, List<Object>> targetPkToSourceValues)
    +            throws SQLException, IOException, InterruptedException {
    +        ResultSet targetResultSet = targetStatement.executeQuery();
    +        while (targetResultSet.next()) {
    +            indxWritable.readFields(targetResultSet);
    +            List<Object> targetValues = indxWritable.getValues();
    +            // first grab the PK and try to join against the source input
    +            // the query is such that first numTargetPkCols of the resultSet is the PK
    +            List<Object> pkObjects = new ArrayList<>(numTargetPkCols);
    +            for (int i = 0; i < numTargetPkCols; i++) {
    +                Object pkPart = targetResultSet.getObject(i + 1);
    --- End diff --
    
    If you want to get at the time stamp of the underlying cell, you can do something like the following: 
    
       targetResultSet.unwrap(PhoenixResultSet.class).getCurrentRow().get(0).getTimestamp();
    
    This would give you back the time stamp of the first Cell from the scan that was executed (we only return a single cell back with the selected expressions packed into the value). Not sure if this is useful or not, but just wanted to pass it along.
    
        


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---