You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by al...@apache.org on 2022/03/18 15:28:11 UTC

[ignite] branch sql-calcite updated (c506f48 -> b597ae5)

This is an automated email from the ASF dual-hosted git repository.

alexpl pushed a change to branch sql-calcite
in repository https://gitbox.apache.org/repos/asf/ignite.git.


    from c506f48  IGNITE-16701 Fix merge left join buffering problem - Fixes #9895.
     add 8b04bf8  IGNITE-16703 fix variable delayed expansion at the win cmd script
     add 3aed85a  IGNITE-16698 Fixed metrics in the IgniteStripedThreadPoolExecutor (#9893)
     add 04abcde  IGNITE-16708 Fix sqlline.bat windows shell scripts (#9898)
     add 71ff768  IGNITE-16702 Checkstyle Indentation rule added (#9897)
     new b597ae5  Merge remote-tracking branch 'remotes/upstream/master' into sql-calcite

The 1 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.


Summary of changes:
 checkstyle/checkstyle.xml                          |    2 +
 .../computegrid/ComputeBroadcastExample.java       |    2 +-
 .../CatboostClassificationModelParserExample.java  |    4 +-
 .../CatboostRegressionModelParserExample.java      |   10 +-
 .../gridify/GridifySingleSplitLoadTest.java        |    8 +-
 .../GridDhtPartitionsStateValidatorBenchmark.java  |    6 +-
 .../jol/FileStoreHeapUtilizationJolBenchmark.java  |   10 +-
 .../query/calcite/exec/ExecutionServiceImpl.java   |    2 +-
 .../query/calcite/exec/exp/ConverterUtils.java     |   36 +-
 .../query/calcite/exec/exp/RexExecutorImpl.java    |  245 +++--
 .../query/calcite/exec/exp/RexToLixTranslator.java |    2 +-
 .../query/calcite/message/ErrorMessage.java        |    2 +-
 .../query/calcite/metadata/IgniteMdRowCount.java   |   12 +-
 .../query/calcite/metadata/cost/IgniteCost.java    |    6 +-
 .../calcite/prepare/IgniteSqlToRelConvertor.java   |    8 +-
 .../prepare/ddl/DdlSqlToCommandConverter.java      |   16 +-
 .../processors/query/calcite/CancelTest.java       |    8 +-
 .../cassandra/common/PropertyMappingHelper.java    |   11 +-
 .../apache/ignite/tests/load/PersonGenerator.java  |   12 +-
 .../common/AbstractEventSecurityContextTest.java   |   17 +-
 .../ignite/common/NodeSslConnectionMetricTest.java |    4 +-
 .../common/RunningQueryInfoCheckInitiatorTest.java |   24 +-
 .../internal/client/ClientStartNodeTask.java       |   14 +-
 .../client/integration/ClientAbstractSelfTest.java |    3 +-
 .../jdbc2/JdbcStatementBatchingSelfTest.java       |    3 +-
 ...thenticatorUserManagementAuthorizationTest.java |    2 +-
 .../ignite/jdbc/thin/JdbcThinBulkLoadSelfTest.java |    8 +-
 .../JdbcThinCacheToJdbcDataTypesCoverageTest.java  |   39 +-
 .../jdbc/thin/JdbcThinDefaultTimeoutTest.java      |   13 +-
 .../thin/JdbcThinMissingLongArrayResultsTest.java  |    6 +-
 .../commandline/ClusterChangeTagCommand.java       |    2 +-
 .../diagnostic/ConnectivityCommand.java            |    4 +-
 .../encryption/EncryptionSubcommands.java          |    2 +-
 .../snapshot/SnapshotRestoreCommand.java           |    8 +-
 .../TracingConfigurationArguments.java             |    2 +-
 .../util/GridCommandHandlerMetadataTest.java       |   11 +-
 .../apache/ignite/util/GridCommandHandlerTest.java |   16 +-
 .../util/PerformanceStatisticsCommandTest.java     |   18 +-
 .../ignite/compute/gridify/GridifySetToValue.java  |    2 +-
 .../ignite/internal/GridTaskSessionImpl.java       |    2 +-
 .../org/apache/ignite/internal/IgniteKernal.java   |    2 +-
 .../apache/ignite/internal/IgniteMXBeanImpl.java   |    2 +-
 .../ignite/internal/IgniteMessagingImpl.java       |    2 +-
 .../org/apache/ignite/internal/IgnitionEx.java     |    6 +-
 .../internal/binary/BinaryReaderHandles.java       |    2 +-
 .../ignite/internal/binary/BinaryWriterExImpl.java |   30 +-
 .../ignite/internal/client/GridClientCompute.java  |    2 +-
 .../internal/client/thin/ReliableChannel.java      |    2 +-
 .../internal/executor/GridExecutorService.java     |   10 +-
 .../internal/jdbc/thin/ConnectionProperties.java   |    2 +-
 .../jdbc/thin/ConnectionPropertiesImpl.java        |   73 +-
 .../managers/communication/GridIoManager.java      |    2 +-
 .../managers/deployment/GridDeploymentManager.java |    2 +-
 .../protocol/gg/GridProtocolHandler.java           |    2 +-
 .../optimized/OptimizedMarshallerUtils.java        |    4 +-
 .../affinity/HistoryAffinityAssignmentImpl.java    |    2 +-
 .../IgniteAuthenticationProcessor.java             |    6 +-
 .../cache/CacheAffinitySharedManager.java          |    2 +-
 .../cache/CacheClientReconnectDiscoveryData.java   |    2 +-
 .../cache/CacheJoinNodeDiscoveryData.java          |    2 +-
 .../cache/CacheWeakQueryIteratorsHolder.java       |    2 +-
 .../processors/cache/GridCacheAdapter.java         |   45 +-
 .../processors/cache/GridCacheIoManager.java       |    2 +-
 .../cache/GridCachePartitionExchangeManager.java   |    2 +-
 .../processors/cache/GridCacheProcessor.java       |   51 +-
 .../processors/cache/GridCacheSharedContext.java   |    2 +-
 .../processors/cache/IgniteCacheExpiryPolicy.java  |    4 +-
 .../cache/IgniteCacheOffheapManagerImpl.java       |   10 +-
 .../processors/cache/IgniteCacheProxyImpl.java     |    4 +-
 .../dht/CacheDistributedGetFutureAdapter.java      |   21 +-
 .../cache/distributed/dht/GridDhtTxRemote.java     |    4 +-
 .../distributed/dht/atomic/GridDhtAtomicCache.java |   16 +-
 .../dht/atomic/GridNearAtomicUpdateResponse.java   |    4 +-
 .../dht/preloader/GridDhtPartitionDemander.java    |   10 +-
 .../cache/distributed/near/GridNearCacheEntry.java |    4 +-
 .../cache/distributed/near/GridNearTxLocal.java    |   30 +-
 .../processors/cache/mvcc/txlog/TxLog.java         |   16 +-
 .../GridCacheDatabaseSharedManager.java            |   31 +-
 .../dumpprocessors/ToStringDumpProcessor.java      |    6 +-
 .../cache/persistence/file/AsyncFileIO.java        |    4 +-
 .../persistence/file/FilePageStoreManager.java     |   17 +-
 .../metastorage/MetastoragePageIOUtils.java        |   49 +-
 .../pagemem/DelayedPageReplacementTracker.java     |    3 +-
 .../cache/persistence/pagemem/PageMemoryEx.java    |   18 +-
 .../snapshot/SnapshotMetadataCollectorTask.java    |   16 +-
 .../snapshot/SnapshotResponseRemoteFutureTask.java |   31 +-
 .../snapshot/SnapshotRestoreProcess.java           |   24 +-
 .../cache/persistence/tree/BPlusTree.java          |    2 +-
 .../cache/persistence/wal/crc/PureJavaCrc32.java   | 1024 ++++++++++----------
 .../wal/serializer/TxRecordSerializer.java         |    2 +-
 .../cache/query/GridCacheQueryManager.java         |    3 +-
 .../cache/transactions/IgniteInternalTx.java       |    2 +-
 .../cache/transactions/IgniteTxManager.java        |   11 +-
 .../cluster/GridClusterStateProcessor.java         |    8 +-
 .../processors/datastreamer/DataStreamerImpl.java  |    2 +-
 .../processors/odbc/ClientListenerNioListener.java |    2 +-
 .../processors/odbc/jdbc/JdbcRequestHandler.java   |    2 +-
 .../platform/callback/PlatformCallbackGateway.java |    2 +-
 .../client/cache/ClientCacheGetAllRequest.java     |    2 +-
 .../cache/ClientCacheQueryContinuousRequest.java   |    4 +-
 .../pool/MetricsAwareExecutorService.java}         |   15 +-
 .../internal/processors/pool/PoolProcessor.java    |  143 +--
 .../processors/query/GridQueryProcessor.java       |   10 +-
 .../handlers/cache/GridCacheCommandHandler.java    |    2 +-
 .../handlers/top/GridTopologyCommandHandler.java   |    8 +-
 .../internal/processors/task/GridTaskWorker.java   |    4 +-
 .../apache/ignite/internal/util/IgniteUtils.java   |    2 +-
 .../ignite/internal/util/StripedExecutor.java      |   44 +-
 .../internal/util/future/GridFutureAdapter.java    |    2 +-
 .../internal/util/ipc/IpcEndpointFactory.java      |    2 +-
 .../offheap/unsafe/GridUnsafePartitionedMap.java   |    2 +-
 .../util/tostring/GridToStringBuilder.java         |   51 +-
 .../internal/visor/cache/VisorCachePartitions.java |    4 +-
 ...orFindAndDeleteGarbageInPersistenceClosure.java |    8 +-
 .../consistency/VisorConsistencyRepairTask.java    |    2 +-
 .../visor/misc/VisorClusterChangeTagTask.java      |    6 +-
 .../apache/ignite/mxbean/ClusterMetricsMXBean.java |   28 +-
 .../org/apache/ignite/mxbean/IgniteMXBean.java     |   10 +-
 .../org/apache/ignite/spi/IgniteSpiAdapter.java    |    7 +-
 .../spi/checkpoint/jdbc/JdbcCheckpointSpi.java     |    2 +-
 .../jobstealing/JobStealingCollisionSpi.java       |    2 +-
 .../tcp/internal/ConnectionClientPool.java         |    2 +-
 .../tcp/internal/InboundConnectionHandler.java     |    4 +-
 .../TcpCommunicationConfigInitializer.java         |    2 +-
 .../ignite/spi/discovery/DiscoverySpiMBean.java    |   14 +-
 .../ignite/spi/discovery/tcp/ClientImpl.java       |    4 +-
 .../ignite/spi/discovery/tcp/ServerImpl.java       |    2 +-
 .../thread/IgniteStripedThreadPoolExecutor.java    |  133 ++-
 .../ignite/thread/IgniteThreadPoolExecutor.java    |   39 +-
 .../RebalanceAfterResettingLostPartitionTest.java  |   15 +-
 ...finityFunctionBackupFilterAbstractSelfTest.java |    2 +-
 ...rNodeAttributeAffinityBackupFilterSelfTest.java |    2 +-
 .../store/IgniteCacheExpiryStoreLoadSelfTest.java  |    4 +-
 .../ignite/internal/ClusterGroupAbstractTest.java  |    6 +-
 .../GridCacheHashMapPutAllWarningsTest.java        |    6 +-
 .../GridFailoverCustomTopologySelfTest.java        |    4 +-
 .../ignite/internal/GridListenActorSelfTest.java   |    2 +-
 .../GridTaskFutureImplStopGridSelfTest.java        |    2 +-
 .../internal/IgniteClientReconnectLockTest.java    |   19 +-
 .../IgniteExplicitImplicitDeploymentSelfTest.java  |  310 +++---
 .../internal/MemoryLeaksOnRestartNodeTest.java     |    2 +-
 .../internal/binary/BinaryObjectToStringTest.java  |   20 +-
 .../internal/cluster/IgniteClusterIdTagTest.java   |   18 +-
 .../IgniteMessageFactoryImplTest.java              |    6 +-
 .../affinity/GridAffinityAssignmentV2Test.java     |   78 +-
 .../AuthenticationConfigurationClusterTest.java    |    8 +-
 .../AuthenticationProcessorNodeRestartTest.java    |    4 +-
 .../AuthenticationProcessorSelfTest.java           |    4 +-
 .../cache/CacheNearReaderUpdateTest.java           |    2 +-
 .../cache/CacheNoAffinityExchangeTest.java         |    8 +-
 .../CacheStoreUsageMultinodeAbstractTest.java      |    7 +-
 .../cache/CacheValidatorMetricsTest.java           |    8 +-
 .../EntryVersionConsistencyReadThroughTest.java    |    8 +-
 ...acheAtomicEntryProcessorDeploymentSelfTest.java |    2 +-
 .../cache/GridCachePutAllFailoverSelfTest.java     |    2 +-
 .../GridCacheQuerySqlFieldInlineSizeSelfTest.java  |    8 +-
 .../GridCacheValueConsistencyAbstractSelfTest.java |    8 +-
 .../IgniteCacheConfigVariationsFullApiTest.java    |    8 +-
 .../cache/IgniteClientCacheStartFailoverTest.java  |    2 +-
 .../cache/WithKeepBinaryCacheFullApiTest.java      |  120 +--
 .../cache/binary/BinaryMetadataRemoveTest.java     |    8 +-
 .../GridCacheBinaryObjectsAbstractSelfTest.java    |    8 +-
 ...IgniteExchangeLatchManagerDiscoHistoryTest.java |   40 +-
 .../CacheClientsConcurrentStartTest.java           |    2 +-
 .../distributed/CacheDetectLostPartitionsTest.java |   14 +-
 .../distributed/CacheRentingStateRepairTest.java   |   22 +-
 .../distributed/GridExchangeFreeSwitchTest.java    |   17 +-
 .../dht/GridCacheColocatedDebugTest.java           |    2 +-
 ...GridCachePartitionedTopologyChangeSelfTest.java |    2 +-
 ...idCachePartitionsUpdateCountersAndSizeTest.java |    4 +-
 ...dCacheAtomicOnheapMultiNodeFullApiSelfTest.java |    4 +-
 .../cache/mvcc/CacheMvccAbstractFeatureTest.java   |    8 +-
 .../cache/mvcc/CacheMvccClusterRestartTest.java    |    2 +-
 ...eMvccIteratorWithConcurrentTransactionTest.java |   30 +-
 ...cLocalEntriesWithConcurrentTransactionTest.java |   30 +-
 ...MvccScanQueryWithConcurrentTransactionTest.java |   44 +-
 ...CacheMvccSizeWithConcurrentTransactionTest.java |   20 +-
 .../IgnitePdsBinaryMetadataAsyncWritingTest.java   |    2 +-
 ...IgnitePdsCacheWalDisabledOnRebalancingTest.java |    2 +-
 .../cache/persistence/db/IgnitePdsWithTtlTest.java |   10 +-
 .../IgniteCheckpointDirtyPagesForLowLoadTest.java  |   15 +-
 .../db/wal/IgniteWalFlushFailoverTest.java         |   10 +-
 .../db/wal/reader/IgniteWalReaderTest.java         |    2 +-
 .../pagemem/RobinHoodBackwardShiftHashMapTest.java |   66 +-
 .../snapshot/IgniteClusterSnapshotCheckTest.java   |    8 +-
 .../snapshot/IgniteSnapshotMXBeanTest.java         |    5 +-
 .../IgniteChangeGlobalStateTest.java               |   12 +-
 .../IgniteNoParrallelClusterIsAllowedTest.java     |    2 +-
 .../join/JoinActiveNodeToActiveCluster.java        |    6 +-
 .../cache/query/CacheScanQueryFailoverTest.java    |    8 +-
 .../cache/query/IndexingSpiQuerySelfTest.java      |    2 +-
 .../CacheContinuousQueryRandomOperationsTest.java  |    4 +-
 ...ontinuousWithTransformerReplicatedSelfTest.java |    2 +-
 .../GridCacheContinuousQueryAbstractSelfTest.java  |    6 +-
 ...niteCacheContinuousQueryImmutableEntryTest.java |    2 +-
 ...gniteCacheContinuousQueryNoUnsubscribeTest.java |    2 +-
 .../cache/transactions/DepthFirstSearchTest.java   |  165 ++--
 .../processors/cache/warmup/WarmUpSelfTest.java    |    4 +-
 .../IgniteComputeCustomExecutorSelfTest.java       |   14 +-
 .../processors/database/BPlusTreeSelfTest.java     |    2 +-
 .../odbc/OdbcEscapeSequenceSelfTest.java           |   72 +-
 .../PerformanceStatisticsSelfTest.java             |   20 +-
 ...tinuousQueryRemoteSecurityContextCheckTest.java |    2 +-
 ...mentDiscoveryListenerNotificationOrderTest.java |  180 ++--
 .../internal/util/collection/BitSetIntSetTest.java |   66 +-
 .../lang/GridMetadataAwareAdapterLoadTest.java     |    4 +-
 .../ignite/loadtests/cache/GridCacheLoadTest.java  |    6 +-
 ...GridBoundedConcurrentLinkedHashSetLoadTest.java |   10 +-
 .../security/SecurityPermissionSetBuilderTest.java |   38 +-
 ...dJobStealingCollisionSpiAttributesSelfTest.java |   32 +-
 .../communication/GridCacheMessageSelfTest.java    |    8 +-
 .../tcp/TcpCommunicationSpiMultiJvmTest.java       |   12 +-
 .../FilterDataForClientNodeDiscoveryTest.java      |   10 +-
 .../tcp/TcpDiscoveryNetworkIssuesTest.java         |    2 +-
 .../tcp/TcpDiscoveryWithWrongServerTest.java       |    4 +-
 .../SocketStreamerUnmarshalVulnerabilityTest.java  |    2 +-
 .../testframework/MessageOrderLogListener.java     |    8 +-
 .../ignite/testframework/MvccFeatureChecker.java   |    4 +-
 .../testframework/junits/GridAbstractTest.java     |    3 +-
 .../junits/common/GridCommonAbstractTest.java      |   18 +-
 .../common/GridCommonAbstractTestSelfTest.java     |   99 +-
 .../testframework/wal/record/RecordUtils.java      |  161 +--
 .../IgniteCacheIteratorsSelfTestSuite.java         |    2 +-
 .../ignite/thread/ThreadPoolMetricsTest.java       |  192 +++-
 .../p2p/compute/AveragePersonSalaryCallable.java   |    4 +-
 .../cache/hibernate/HibernateL2CacheSelfTest.java  |    2 +-
 .../processors/query/h2/H2PooledConnection.java    |    2 +-
 .../processors/query/h2/IgniteH2Indexing.java      |    4 +-
 .../processors/query/h2/twostep/SortedReducer.java |    2 +-
 .../query/h2/twostep/UnsortedReducer.java          |    2 +-
 .../query/stat/IgniteGlobalStatisticsManager.java  |   48 +-
 .../visor/verify/ValidateIndexesClosure.java       |   23 +-
 .../ignite/cache/query/IndexQueryFailoverTest.java |   77 +-
 .../cache/query/IndexQueryWrongIndexTest.java      |    4 +-
 .../cache/BinaryTypeMismatchLoggingTest.java       |    7 +-
 .../cache/CacheIteratorScanQueryTest.java          |    2 +-
 .../CacheRandomOperationsMultithreadedTest.java    |    2 +-
 .../IgniteBinaryObjectQueryArgumentsTest.java      |    2 +-
 ...niteBinaryWrappedObjectFieldsQuerySelfTest.java |    3 +-
 .../IgniteCacheAbstractInsertSqlQuerySelfTest.java |   15 +-
 .../cache/IgniteCacheAbstractQuerySelfTest.java    |   12 +-
 .../IgniteCacheConfigVariationsQueryTest.java      |    8 +-
 .../cache/IgniteCacheSqlQueryErrorSelfTest.java    |    2 +-
 .../QueryJoinWithDifferentNodeFiltersTest.java     |    2 +-
 .../cache/index/H2DynamicTableSelfTest.java        |    2 +-
 ...ransactionsCommandsWithMvccEnabledSelfTest.java |   10 +-
 .../cache/index/SqlTransactionsSelfTest.java       |   10 +-
 ...cheMvccSqlTxQueriesWithReducerAbstractTest.java |    2 +-
 .../cache/mvcc/MvccRepeatableReadBulkOpsTest.java  |   10 +-
 .../db/LongDestroyDurableBackgroundTaskTest.java   |    8 +-
 .../persistence/db/wal/IgniteWalRecoveryTest.java  |   43 +-
 .../query/CreateIndexOnInvalidDataTypeTest.java    |   14 +-
 .../query/IgniteQueryDedicatedPoolTest.java        |    9 +-
 .../internal/processors/query/LazyOnDmlTest.java   |    2 +-
 .../query/MemLeakOnSqlWithClientReconnectTest.java |   23 +-
 ...SqlFieldTypeValidationOnKeyValueInsertTest.java |    5 +-
 .../h2/GridSubqueryJoinOptimizerSelfTest.java      |    2 +-
 .../inlinecolumn/ComputeInlineSizeTest.java        |   28 +-
 .../query/h2/sql/H2CompareBigQueryTest.java        |  122 +--
 .../processors/query/stat/BusyExecutorTest.java    |    2 +-
 .../query/stat/SqlStatisticsCommandTests.java      |    7 +-
 .../query/stat/StatisticsGlobalViewTest.java       |    6 +-
 .../ignite/logger/log4j2/Log4j2LoggerSelfTest.java |    5 +-
 .../log4j2/Log4j2LoggerVerboseModeSelfTest.java    |    5 +-
 .../org/apache/ignite/mesos/ClusterProperties.java |    2 +-
 .../ml/catboost/CatboostClassificationModel.java   |   12 +-
 .../ml/catboost/CatboostRegressionModel.java       |   12 +-
 .../ml/catboost/CatboostRegressionModelParser.java |    5 +-
 .../ignite/ml/clustering/kmeans/KMeansModel.java   |   61 +-
 .../PredictionsAggregator.java                     |   11 +-
 .../ignite/ml/knn/ann/ANNClassificationModel.java  |    8 +-
 .../ml/math/distances/BrayCurtisDistance.java      |   58 +-
 .../ignite/ml/math/distances/CanberraDistance.java |   60 +-
 .../ignite/ml/math/distances/DistanceMeasure.java  |   27 +-
 .../ml/math/distances/JensenShannonDistance.java   |  102 +-
 .../math/distances/WeightedMinkowskiDistance.java  |  118 +--
 .../encoding/target/TargetCounter.java             |   80 +-
 .../encoding/target/TargetEncodingMeta.java        |   44 +-
 .../regressions/linear/LinearRegressionModel.java  |   29 +-
 .../logistic/LogisticRegressionModel.java          |   55 +-
 .../ignite/ml/structures/LabeledVectorSet.java     |    4 +-
 .../ml/svm/SVMLinearClassificationModel.java       |   54 +-
 .../ml/trainers/AdaptableDatasetTrainer.java       |    2 +-
 .../apache/ignite/ml/tree/DecisionTreeNode.java    |   19 +-
 .../apache/ignite/ml/common/ExternalizeTest.java   |    2 +-
 .../impl/local/LocalDatasetBuilderTest.java        |   26 +-
 .../ml/dataset/primitive/SimpleDatasetTest.java    |   12 +-
 .../primitive/SimpleLabeledDatasetTest.java        |   12 +-
 .../ml/environment/LearningEnvironmentTest.java    |   10 +-
 .../ml/math/distances/BrayCurtisDistanceTest.java  |  128 +--
 .../ml/math/distances/CanberraDistanceTest.java    |  128 +--
 .../math/distances/JensenShannonDistanceTest.java  |  132 +--
 .../distances/WeightedMinkowskiDistanceTest.java   |  146 +--
 .../vector/DenseVectorConstructorTest.java         |   50 +-
 .../primitives/vector/VectorNormCasesTest.java     |  142 +--
 .../math/primitives/vector/VectorToMatrixTest.java |   16 +-
 .../binarization/BinarizationPreprocessorTest.java |    4 +-
 .../encoding/FrequencyEncoderPreprocessorTest.java |   38 +-
 .../encoding/OneHotEncoderPreprocessorTest.java    |   67 +-
 .../encoding/StringEncoderPreprocessorTest.java    |   38 +-
 .../encoding/TargetEncoderPreprocessorTest.java    |   64 +-
 .../imputing/ImputerPreprocessorTest.java          |    4 +-
 .../MinMaxScalerPreprocessorTest.java              |    4 +-
 .../NormalizationPreprocessorTest.java             |    4 +-
 .../paramgrid/ParameterSetGeneratorTest.java       |    2 +-
 .../BinaryClassificationMetricsTest.java           |   12 +-
 .../metric/regression/RegressionMetricsTest.java   |   12 +-
 .../internal/mem/NumaAllocatorBasicTest.java       |    2 +-
 .../ignite/TracingConfigurationValidationTest.java |    4 +-
 .../spring/IgniteExcludeInConfigurationTest.java   |    4 +-
 modules/sqlline/bin/sqlline.bat                    |    1 +
 .../internal/websession/WebSessionSelfTest.java    |    2 +-
 .../yardstick/cache/CacheEntryEventAsyncProbe.java |    2 +-
 .../cache/IgniteCreateIndexBenchmark.java          |    2 +-
 .../jdbc/vendors/BaseSelectRangeBenchmark.java     |    2 +-
 .../yarn/IgniteApplicationMasterSelfTest.java      |    4 +-
 .../zk/internal/ZkDiscoveryEventsData.java         |    2 +-
 .../zk/internal/ZkDiscoveryNodeLeaveEventData.java |    2 +-
 .../zk/internal/ZookeeperDiscoveryImpl.java        |   56 +-
 319 files changed, 3826 insertions(+), 3674 deletions(-)
 copy modules/core/src/main/java/org/apache/ignite/{binary/BinaryCollectionFactory.java => internal/processors/pool/MetricsAwareExecutorService.java} (70%)

[ignite] 01/01: Merge remote-tracking branch 'remotes/upstream/master' into sql-calcite

Posted by al...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

alexpl pushed a commit to branch sql-calcite
in repository https://gitbox.apache.org/repos/asf/ignite.git

commit b597ae54ed39ebf4e6ffc8c8423c0fe943054012
Merge: c506f48 71ff768
Author: Aleksey Plekhanov <pl...@gmail.com>
AuthorDate: Fri Mar 18 18:21:32 2022 +0300

    Merge remote-tracking branch 'remotes/upstream/master' into sql-calcite
    
    # Conflicts:
    #	modules/core/src/main/java/org/apache/ignite/internal/processors/query/SqlClientContext.java
    #	modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/stat/IgniteStatisticsConfigurationManager.java

 checkstyle/checkstyle.xml                          |    2 +
 .../computegrid/ComputeBroadcastExample.java       |    2 +-
 .../CatboostClassificationModelParserExample.java  |    4 +-
 .../CatboostRegressionModelParserExample.java      |   10 +-
 .../gridify/GridifySingleSplitLoadTest.java        |    8 +-
 .../GridDhtPartitionsStateValidatorBenchmark.java  |    6 +-
 .../jol/FileStoreHeapUtilizationJolBenchmark.java  |   10 +-
 .../query/calcite/exec/ExecutionServiceImpl.java   |    2 +-
 .../query/calcite/exec/exp/ConverterUtils.java     |   36 +-
 .../query/calcite/exec/exp/RexExecutorImpl.java    |  245 +++--
 .../query/calcite/exec/exp/RexToLixTranslator.java |    2 +-
 .../query/calcite/message/ErrorMessage.java        |    2 +-
 .../query/calcite/metadata/IgniteMdRowCount.java   |   12 +-
 .../query/calcite/metadata/cost/IgniteCost.java    |    6 +-
 .../calcite/prepare/IgniteSqlToRelConvertor.java   |    8 +-
 .../prepare/ddl/DdlSqlToCommandConverter.java      |   16 +-
 .../processors/query/calcite/CancelTest.java       |    8 +-
 .../cassandra/common/PropertyMappingHelper.java    |   11 +-
 .../apache/ignite/tests/load/PersonGenerator.java  |   12 +-
 .../common/AbstractEventSecurityContextTest.java   |   17 +-
 .../ignite/common/NodeSslConnectionMetricTest.java |    4 +-
 .../common/RunningQueryInfoCheckInitiatorTest.java |   24 +-
 .../internal/client/ClientStartNodeTask.java       |   14 +-
 .../client/integration/ClientAbstractSelfTest.java |    3 +-
 .../jdbc2/JdbcStatementBatchingSelfTest.java       |    3 +-
 ...thenticatorUserManagementAuthorizationTest.java |    2 +-
 .../ignite/jdbc/thin/JdbcThinBulkLoadSelfTest.java |    8 +-
 .../JdbcThinCacheToJdbcDataTypesCoverageTest.java  |   39 +-
 .../jdbc/thin/JdbcThinDefaultTimeoutTest.java      |   13 +-
 .../thin/JdbcThinMissingLongArrayResultsTest.java  |    6 +-
 .../commandline/ClusterChangeTagCommand.java       |    2 +-
 .../diagnostic/ConnectivityCommand.java            |    4 +-
 .../encryption/EncryptionSubcommands.java          |    2 +-
 .../snapshot/SnapshotRestoreCommand.java           |    8 +-
 .../TracingConfigurationArguments.java             |    2 +-
 .../util/GridCommandHandlerMetadataTest.java       |   11 +-
 .../apache/ignite/util/GridCommandHandlerTest.java |   16 +-
 .../util/PerformanceStatisticsCommandTest.java     |   18 +-
 .../ignite/compute/gridify/GridifySetToValue.java  |    2 +-
 .../ignite/internal/GridTaskSessionImpl.java       |    2 +-
 .../org/apache/ignite/internal/IgniteKernal.java   |    2 +-
 .../apache/ignite/internal/IgniteMXBeanImpl.java   |    2 +-
 .../ignite/internal/IgniteMessagingImpl.java       |    2 +-
 .../org/apache/ignite/internal/IgnitionEx.java     |    6 +-
 .../internal/binary/BinaryReaderHandles.java       |    2 +-
 .../ignite/internal/binary/BinaryWriterExImpl.java |   30 +-
 .../ignite/internal/client/GridClientCompute.java  |    2 +-
 .../internal/client/thin/ReliableChannel.java      |    2 +-
 .../internal/executor/GridExecutorService.java     |   10 +-
 .../internal/jdbc/thin/ConnectionProperties.java   |    2 +-
 .../jdbc/thin/ConnectionPropertiesImpl.java        |   73 +-
 .../managers/communication/GridIoManager.java      |    2 +-
 .../managers/deployment/GridDeploymentManager.java |    2 +-
 .../protocol/gg/GridProtocolHandler.java           |    2 +-
 .../optimized/OptimizedMarshallerUtils.java        |    4 +-
 .../affinity/HistoryAffinityAssignmentImpl.java    |    2 +-
 .../IgniteAuthenticationProcessor.java             |    6 +-
 .../cache/CacheAffinitySharedManager.java          |    2 +-
 .../cache/CacheClientReconnectDiscoveryData.java   |    2 +-
 .../cache/CacheJoinNodeDiscoveryData.java          |    2 +-
 .../cache/CacheWeakQueryIteratorsHolder.java       |    2 +-
 .../processors/cache/GridCacheAdapter.java         |   45 +-
 .../processors/cache/GridCacheIoManager.java       |    2 +-
 .../cache/GridCachePartitionExchangeManager.java   |    2 +-
 .../processors/cache/GridCacheProcessor.java       |   51 +-
 .../processors/cache/GridCacheSharedContext.java   |    2 +-
 .../processors/cache/IgniteCacheExpiryPolicy.java  |    4 +-
 .../cache/IgniteCacheOffheapManagerImpl.java       |   10 +-
 .../processors/cache/IgniteCacheProxyImpl.java     |    4 +-
 .../dht/CacheDistributedGetFutureAdapter.java      |   21 +-
 .../cache/distributed/dht/GridDhtTxRemote.java     |    4 +-
 .../distributed/dht/atomic/GridDhtAtomicCache.java |   16 +-
 .../dht/atomic/GridNearAtomicUpdateResponse.java   |    4 +-
 .../dht/preloader/GridDhtPartitionDemander.java    |   10 +-
 .../cache/distributed/near/GridNearCacheEntry.java |    4 +-
 .../cache/distributed/near/GridNearTxLocal.java    |   30 +-
 .../processors/cache/mvcc/txlog/TxLog.java         |   16 +-
 .../GridCacheDatabaseSharedManager.java            |   31 +-
 .../dumpprocessors/ToStringDumpProcessor.java      |    6 +-
 .../cache/persistence/file/AsyncFileIO.java        |    4 +-
 .../persistence/file/FilePageStoreManager.java     |   17 +-
 .../metastorage/MetastoragePageIOUtils.java        |   49 +-
 .../pagemem/DelayedPageReplacementTracker.java     |    3 +-
 .../cache/persistence/pagemem/PageMemoryEx.java    |   18 +-
 .../snapshot/SnapshotMetadataCollectorTask.java    |   16 +-
 .../snapshot/SnapshotResponseRemoteFutureTask.java |   31 +-
 .../snapshot/SnapshotRestoreProcess.java           |   24 +-
 .../cache/persistence/tree/BPlusTree.java          |    2 +-
 .../cache/persistence/wal/crc/PureJavaCrc32.java   | 1024 ++++++++++----------
 .../wal/serializer/TxRecordSerializer.java         |    2 +-
 .../cache/query/GridCacheQueryManager.java         |    3 +-
 .../cache/transactions/IgniteInternalTx.java       |    2 +-
 .../cache/transactions/IgniteTxManager.java        |   11 +-
 .../cluster/GridClusterStateProcessor.java         |    8 +-
 .../processors/datastreamer/DataStreamerImpl.java  |    2 +-
 .../processors/odbc/ClientListenerNioListener.java |    2 +-
 .../processors/odbc/jdbc/JdbcRequestHandler.java   |    2 +-
 .../platform/callback/PlatformCallbackGateway.java |    2 +-
 .../client/cache/ClientCacheGetAllRequest.java     |    2 +-
 .../cache/ClientCacheQueryContinuousRequest.java   |    4 +-
 .../pool/MetricsAwareExecutorService.java}         |   21 +-
 .../internal/processors/pool/PoolProcessor.java    |  143 +--
 .../processors/query/GridQueryProcessor.java       |   10 +-
 .../handlers/cache/GridCacheCommandHandler.java    |    2 +-
 .../handlers/top/GridTopologyCommandHandler.java   |    8 +-
 .../internal/processors/task/GridTaskWorker.java   |    4 +-
 .../apache/ignite/internal/util/IgniteUtils.java   |    2 +-
 .../ignite/internal/util/StripedExecutor.java      |   44 +-
 .../internal/util/future/GridFutureAdapter.java    |    2 +-
 .../internal/util/ipc/IpcEndpointFactory.java      |    2 +-
 .../offheap/unsafe/GridUnsafePartitionedMap.java   |    2 +-
 .../util/tostring/GridToStringBuilder.java         |   51 +-
 .../internal/visor/cache/VisorCachePartitions.java |    4 +-
 ...orFindAndDeleteGarbageInPersistenceClosure.java |    8 +-
 .../consistency/VisorConsistencyRepairTask.java    |    2 +-
 .../visor/misc/VisorClusterChangeTagTask.java      |    6 +-
 .../apache/ignite/mxbean/ClusterMetricsMXBean.java |   28 +-
 .../org/apache/ignite/mxbean/IgniteMXBean.java     |   10 +-
 .../org/apache/ignite/spi/IgniteSpiAdapter.java    |    7 +-
 .../spi/checkpoint/jdbc/JdbcCheckpointSpi.java     |    2 +-
 .../jobstealing/JobStealingCollisionSpi.java       |    2 +-
 .../tcp/internal/ConnectionClientPool.java         |    2 +-
 .../tcp/internal/InboundConnectionHandler.java     |    4 +-
 .../TcpCommunicationConfigInitializer.java         |    2 +-
 .../ignite/spi/discovery/DiscoverySpiMBean.java    |   14 +-
 .../ignite/spi/discovery/tcp/ClientImpl.java       |    4 +-
 .../ignite/spi/discovery/tcp/ServerImpl.java       |    2 +-
 .../thread/IgniteStripedThreadPoolExecutor.java    |  133 ++-
 .../ignite/thread/IgniteThreadPoolExecutor.java    |   39 +-
 .../RebalanceAfterResettingLostPartitionTest.java  |   15 +-
 ...finityFunctionBackupFilterAbstractSelfTest.java |    2 +-
 ...rNodeAttributeAffinityBackupFilterSelfTest.java |    2 +-
 .../store/IgniteCacheExpiryStoreLoadSelfTest.java  |    4 +-
 .../ignite/internal/ClusterGroupAbstractTest.java  |    6 +-
 .../GridCacheHashMapPutAllWarningsTest.java        |    6 +-
 .../GridFailoverCustomTopologySelfTest.java        |    4 +-
 .../ignite/internal/GridListenActorSelfTest.java   |    2 +-
 .../GridTaskFutureImplStopGridSelfTest.java        |    2 +-
 .../internal/IgniteClientReconnectLockTest.java    |   19 +-
 .../IgniteExplicitImplicitDeploymentSelfTest.java  |  310 +++---
 .../internal/MemoryLeaksOnRestartNodeTest.java     |    2 +-
 .../internal/binary/BinaryObjectToStringTest.java  |   20 +-
 .../internal/cluster/IgniteClusterIdTagTest.java   |   18 +-
 .../IgniteMessageFactoryImplTest.java              |    6 +-
 .../affinity/GridAffinityAssignmentV2Test.java     |   78 +-
 .../AuthenticationConfigurationClusterTest.java    |    8 +-
 .../AuthenticationProcessorNodeRestartTest.java    |    4 +-
 .../AuthenticationProcessorSelfTest.java           |    4 +-
 .../cache/CacheNearReaderUpdateTest.java           |    2 +-
 .../cache/CacheNoAffinityExchangeTest.java         |    8 +-
 .../CacheStoreUsageMultinodeAbstractTest.java      |    7 +-
 .../cache/CacheValidatorMetricsTest.java           |    8 +-
 .../EntryVersionConsistencyReadThroughTest.java    |    8 +-
 ...acheAtomicEntryProcessorDeploymentSelfTest.java |    2 +-
 .../cache/GridCachePutAllFailoverSelfTest.java     |    2 +-
 .../GridCacheQuerySqlFieldInlineSizeSelfTest.java  |    8 +-
 .../GridCacheValueConsistencyAbstractSelfTest.java |    8 +-
 .../IgniteCacheConfigVariationsFullApiTest.java    |    8 +-
 .../cache/IgniteClientCacheStartFailoverTest.java  |    2 +-
 .../cache/WithKeepBinaryCacheFullApiTest.java      |  120 +--
 .../cache/binary/BinaryMetadataRemoveTest.java     |    8 +-
 .../GridCacheBinaryObjectsAbstractSelfTest.java    |    8 +-
 ...IgniteExchangeLatchManagerDiscoHistoryTest.java |   40 +-
 .../CacheClientsConcurrentStartTest.java           |    2 +-
 .../distributed/CacheDetectLostPartitionsTest.java |   14 +-
 .../distributed/CacheRentingStateRepairTest.java   |   22 +-
 .../distributed/GridExchangeFreeSwitchTest.java    |   17 +-
 .../dht/GridCacheColocatedDebugTest.java           |    2 +-
 ...GridCachePartitionedTopologyChangeSelfTest.java |    2 +-
 ...idCachePartitionsUpdateCountersAndSizeTest.java |    4 +-
 ...dCacheAtomicOnheapMultiNodeFullApiSelfTest.java |    4 +-
 .../cache/mvcc/CacheMvccAbstractFeatureTest.java   |    8 +-
 .../cache/mvcc/CacheMvccClusterRestartTest.java    |    2 +-
 ...eMvccIteratorWithConcurrentTransactionTest.java |   30 +-
 ...cLocalEntriesWithConcurrentTransactionTest.java |   30 +-
 ...MvccScanQueryWithConcurrentTransactionTest.java |   44 +-
 ...CacheMvccSizeWithConcurrentTransactionTest.java |   20 +-
 .../IgnitePdsBinaryMetadataAsyncWritingTest.java   |    2 +-
 ...IgnitePdsCacheWalDisabledOnRebalancingTest.java |    2 +-
 .../cache/persistence/db/IgnitePdsWithTtlTest.java |   10 +-
 .../IgniteCheckpointDirtyPagesForLowLoadTest.java  |   15 +-
 .../db/wal/IgniteWalFlushFailoverTest.java         |   10 +-
 .../db/wal/reader/IgniteWalReaderTest.java         |    2 +-
 .../pagemem/RobinHoodBackwardShiftHashMapTest.java |   66 +-
 .../snapshot/IgniteClusterSnapshotCheckTest.java   |    8 +-
 .../snapshot/IgniteSnapshotMXBeanTest.java         |    5 +-
 .../IgniteChangeGlobalStateTest.java               |   12 +-
 .../IgniteNoParrallelClusterIsAllowedTest.java     |    2 +-
 .../join/JoinActiveNodeToActiveCluster.java        |    6 +-
 .../cache/query/CacheScanQueryFailoverTest.java    |    8 +-
 .../cache/query/IndexingSpiQuerySelfTest.java      |    2 +-
 .../CacheContinuousQueryRandomOperationsTest.java  |    4 +-
 ...ontinuousWithTransformerReplicatedSelfTest.java |    2 +-
 .../GridCacheContinuousQueryAbstractSelfTest.java  |    6 +-
 ...niteCacheContinuousQueryImmutableEntryTest.java |    2 +-
 ...gniteCacheContinuousQueryNoUnsubscribeTest.java |    2 +-
 .../cache/transactions/DepthFirstSearchTest.java   |  165 ++--
 .../processors/cache/warmup/WarmUpSelfTest.java    |    4 +-
 .../IgniteComputeCustomExecutorSelfTest.java       |   14 +-
 .../processors/database/BPlusTreeSelfTest.java     |    2 +-
 .../odbc/OdbcEscapeSequenceSelfTest.java           |   72 +-
 .../PerformanceStatisticsSelfTest.java             |   20 +-
 ...tinuousQueryRemoteSecurityContextCheckTest.java |    2 +-
 ...mentDiscoveryListenerNotificationOrderTest.java |  180 ++--
 .../internal/util/collection/BitSetIntSetTest.java |   66 +-
 .../lang/GridMetadataAwareAdapterLoadTest.java     |    4 +-
 .../ignite/loadtests/cache/GridCacheLoadTest.java  |    6 +-
 ...GridBoundedConcurrentLinkedHashSetLoadTest.java |   10 +-
 .../security/SecurityPermissionSetBuilderTest.java |   38 +-
 ...dJobStealingCollisionSpiAttributesSelfTest.java |   32 +-
 .../communication/GridCacheMessageSelfTest.java    |    8 +-
 .../tcp/TcpCommunicationSpiMultiJvmTest.java       |   12 +-
 .../FilterDataForClientNodeDiscoveryTest.java      |   10 +-
 .../tcp/TcpDiscoveryNetworkIssuesTest.java         |    2 +-
 .../tcp/TcpDiscoveryWithWrongServerTest.java       |    4 +-
 .../SocketStreamerUnmarshalVulnerabilityTest.java  |    2 +-
 .../testframework/MessageOrderLogListener.java     |    8 +-
 .../ignite/testframework/MvccFeatureChecker.java   |    4 +-
 .../testframework/junits/GridAbstractTest.java     |    3 +-
 .../junits/common/GridCommonAbstractTest.java      |   18 +-
 .../common/GridCommonAbstractTestSelfTest.java     |   99 +-
 .../testframework/wal/record/RecordUtils.java      |  161 +--
 .../IgniteCacheIteratorsSelfTestSuite.java         |    2 +-
 .../ignite/thread/ThreadPoolMetricsTest.java       |  192 +++-
 .../p2p/compute/AveragePersonSalaryCallable.java   |    4 +-
 .../cache/hibernate/HibernateL2CacheSelfTest.java  |    2 +-
 .../processors/query/h2/H2PooledConnection.java    |    2 +-
 .../processors/query/h2/IgniteH2Indexing.java      |    4 +-
 .../processors/query/h2/twostep/SortedReducer.java |    2 +-
 .../query/h2/twostep/UnsortedReducer.java          |    2 +-
 .../query/stat/IgniteGlobalStatisticsManager.java  |   48 +-
 .../visor/verify/ValidateIndexesClosure.java       |   23 +-
 .../ignite/cache/query/IndexQueryFailoverTest.java |   77 +-
 .../cache/query/IndexQueryWrongIndexTest.java      |    4 +-
 .../cache/BinaryTypeMismatchLoggingTest.java       |    7 +-
 .../cache/CacheIteratorScanQueryTest.java          |    2 +-
 .../CacheRandomOperationsMultithreadedTest.java    |    2 +-
 .../IgniteBinaryObjectQueryArgumentsTest.java      |    2 +-
 ...niteBinaryWrappedObjectFieldsQuerySelfTest.java |    3 +-
 .../IgniteCacheAbstractInsertSqlQuerySelfTest.java |   15 +-
 .../cache/IgniteCacheAbstractQuerySelfTest.java    |   12 +-
 .../IgniteCacheConfigVariationsQueryTest.java      |    8 +-
 .../cache/IgniteCacheSqlQueryErrorSelfTest.java    |    2 +-
 .../QueryJoinWithDifferentNodeFiltersTest.java     |    2 +-
 .../cache/index/H2DynamicTableSelfTest.java        |    2 +-
 ...ransactionsCommandsWithMvccEnabledSelfTest.java |   10 +-
 .../cache/index/SqlTransactionsSelfTest.java       |   10 +-
 ...cheMvccSqlTxQueriesWithReducerAbstractTest.java |    2 +-
 .../cache/mvcc/MvccRepeatableReadBulkOpsTest.java  |   10 +-
 .../db/LongDestroyDurableBackgroundTaskTest.java   |    8 +-
 .../persistence/db/wal/IgniteWalRecoveryTest.java  |   43 +-
 .../query/CreateIndexOnInvalidDataTypeTest.java    |   14 +-
 .../query/IgniteQueryDedicatedPoolTest.java        |    9 +-
 .../internal/processors/query/LazyOnDmlTest.java   |    2 +-
 .../query/MemLeakOnSqlWithClientReconnectTest.java |   23 +-
 ...SqlFieldTypeValidationOnKeyValueInsertTest.java |    5 +-
 .../h2/GridSubqueryJoinOptimizerSelfTest.java      |    2 +-
 .../inlinecolumn/ComputeInlineSizeTest.java        |   28 +-
 .../query/h2/sql/H2CompareBigQueryTest.java        |  122 +--
 .../processors/query/stat/BusyExecutorTest.java    |    2 +-
 .../query/stat/SqlStatisticsCommandTests.java      |    7 +-
 .../query/stat/StatisticsGlobalViewTest.java       |    6 +-
 .../ignite/logger/log4j2/Log4j2LoggerSelfTest.java |    5 +-
 .../log4j2/Log4j2LoggerVerboseModeSelfTest.java    |    5 +-
 .../org/apache/ignite/mesos/ClusterProperties.java |    2 +-
 .../ml/catboost/CatboostClassificationModel.java   |   12 +-
 .../ml/catboost/CatboostRegressionModel.java       |   12 +-
 .../ml/catboost/CatboostRegressionModelParser.java |    5 +-
 .../ignite/ml/clustering/kmeans/KMeansModel.java   |   61 +-
 .../PredictionsAggregator.java                     |   11 +-
 .../ignite/ml/knn/ann/ANNClassificationModel.java  |    8 +-
 .../ml/math/distances/BrayCurtisDistance.java      |   58 +-
 .../ignite/ml/math/distances/CanberraDistance.java |   60 +-
 .../ignite/ml/math/distances/DistanceMeasure.java  |   27 +-
 .../ml/math/distances/JensenShannonDistance.java   |  102 +-
 .../math/distances/WeightedMinkowskiDistance.java  |  118 +--
 .../encoding/target/TargetCounter.java             |   80 +-
 .../encoding/target/TargetEncodingMeta.java        |   44 +-
 .../regressions/linear/LinearRegressionModel.java  |   29 +-
 .../logistic/LogisticRegressionModel.java          |   55 +-
 .../ignite/ml/structures/LabeledVectorSet.java     |    4 +-
 .../ml/svm/SVMLinearClassificationModel.java       |   54 +-
 .../ml/trainers/AdaptableDatasetTrainer.java       |    2 +-
 .../apache/ignite/ml/tree/DecisionTreeNode.java    |   19 +-
 .../apache/ignite/ml/common/ExternalizeTest.java   |    2 +-
 .../impl/local/LocalDatasetBuilderTest.java        |   26 +-
 .../ml/dataset/primitive/SimpleDatasetTest.java    |   12 +-
 .../primitive/SimpleLabeledDatasetTest.java        |   12 +-
 .../ml/environment/LearningEnvironmentTest.java    |   10 +-
 .../ml/math/distances/BrayCurtisDistanceTest.java  |  128 +--
 .../ml/math/distances/CanberraDistanceTest.java    |  128 +--
 .../math/distances/JensenShannonDistanceTest.java  |  132 +--
 .../distances/WeightedMinkowskiDistanceTest.java   |  146 +--
 .../vector/DenseVectorConstructorTest.java         |   50 +-
 .../primitives/vector/VectorNormCasesTest.java     |  142 +--
 .../math/primitives/vector/VectorToMatrixTest.java |   16 +-
 .../binarization/BinarizationPreprocessorTest.java |    4 +-
 .../encoding/FrequencyEncoderPreprocessorTest.java |   38 +-
 .../encoding/OneHotEncoderPreprocessorTest.java    |   67 +-
 .../encoding/StringEncoderPreprocessorTest.java    |   38 +-
 .../encoding/TargetEncoderPreprocessorTest.java    |   64 +-
 .../imputing/ImputerPreprocessorTest.java          |    4 +-
 .../MinMaxScalerPreprocessorTest.java              |    4 +-
 .../NormalizationPreprocessorTest.java             |    4 +-
 .../paramgrid/ParameterSetGeneratorTest.java       |    2 +-
 .../BinaryClassificationMetricsTest.java           |   12 +-
 .../metric/regression/RegressionMetricsTest.java   |   12 +-
 .../internal/mem/NumaAllocatorBasicTest.java       |    2 +-
 .../ignite/TracingConfigurationValidationTest.java |    4 +-
 .../spring/IgniteExcludeInConfigurationTest.java   |    4 +-
 modules/sqlline/bin/sqlline.bat                    |    1 +
 .../internal/websession/WebSessionSelfTest.java    |    2 +-
 .../yardstick/cache/CacheEntryEventAsyncProbe.java |    2 +-
 .../cache/IgniteCreateIndexBenchmark.java          |    2 +-
 .../jdbc/vendors/BaseSelectRangeBenchmark.java     |    2 +-
 .../yarn/IgniteApplicationMasterSelfTest.java      |    4 +-
 .../zk/internal/ZkDiscoveryEventsData.java         |    2 +-
 .../zk/internal/ZkDiscoveryNodeLeaveEventData.java |    2 +-
 .../zk/internal/ZookeeperDiscoveryImpl.java        |   56 +-
 319 files changed, 3832 insertions(+), 3674 deletions(-)

diff --cc modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/ExecutionServiceImpl.java
index e4d674b,0000000..60c5ee1
mode 100644,000000..100644
--- a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/ExecutionServiceImpl.java
+++ b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/ExecutionServiceImpl.java
@@@ -1,757 -1,0 +1,757 @@@
 +/*
 + * Licensed to the Apache Software Foundation (ASF) under one or more
 + * contributor license agreements.  See the NOTICE file distributed with
 + * this work for additional information regarding copyright ownership.
 + * The ASF licenses this file to You under the Apache License, Version 2.0
 + * (the "License"); you may not use this file except in compliance with
 + * the License.  You may obtain a copy of the License at
 + *
 + *      http://www.apache.org/licenses/LICENSE-2.0
 + *
 + * Unless required by applicable law or agreed to in writing, software
 + * distributed under the License is distributed on an "AS IS" BASIS,
 + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 + * See the License for the specific language governing permissions and
 + * limitations under the License.
 + */
 +
 +package org.apache.ignite.internal.processors.query.calcite.exec;
 +
 +import java.util.Collections;
 +import java.util.List;
 +import java.util.Map;
 +import java.util.Objects;
 +import java.util.UUID;
 +import java.util.function.Function;
 +import java.util.stream.Collectors;
 +import org.apache.calcite.plan.Context;
 +import org.apache.calcite.plan.Contexts;
 +import org.apache.calcite.plan.RelOptUtil;
 +import org.apache.calcite.rel.type.RelDataType;
 +import org.apache.calcite.sql.SqlInsert;
 +import org.apache.calcite.sql.SqlKind;
 +import org.apache.calcite.tools.Frameworks;
 +import org.apache.ignite.IgniteCheckedException;
 +import org.apache.ignite.IgniteException;
 +import org.apache.ignite.cache.query.FieldsQueryCursor;
 +import org.apache.ignite.events.EventType;
 +import org.apache.ignite.internal.GridKernalContext;
 +import org.apache.ignite.internal.managers.eventstorage.DiscoveryEventListener;
 +import org.apache.ignite.internal.managers.eventstorage.GridEventStorageManager;
 +import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion;
 +import org.apache.ignite.internal.processors.cache.GridCachePartitionExchangeManager;
 +import org.apache.ignite.internal.processors.cache.QueryCursorImpl;
 +import org.apache.ignite.internal.processors.cache.query.IgniteQueryErrorCode;
 +import org.apache.ignite.internal.processors.failure.FailureProcessor;
 +import org.apache.ignite.internal.processors.query.IgniteSQLException;
 +import org.apache.ignite.internal.processors.query.QueryState;
 +import org.apache.ignite.internal.processors.query.RunningQuery;
 +import org.apache.ignite.internal.processors.query.calcite.CalciteQueryProcessor;
 +import org.apache.ignite.internal.processors.query.calcite.Query;
 +import org.apache.ignite.internal.processors.query.calcite.QueryRegistry;
 +import org.apache.ignite.internal.processors.query.calcite.RootQuery;
 +import org.apache.ignite.internal.processors.query.calcite.RunningFragment;
 +import org.apache.ignite.internal.processors.query.calcite.exec.ddl.DdlCommandHandler;
 +import org.apache.ignite.internal.processors.query.calcite.exec.rel.Inbox;
 +import org.apache.ignite.internal.processors.query.calcite.exec.rel.Node;
 +import org.apache.ignite.internal.processors.query.calcite.exec.rel.Outbox;
 +import org.apache.ignite.internal.processors.query.calcite.message.ErrorMessage;
 +import org.apache.ignite.internal.processors.query.calcite.message.MessageService;
 +import org.apache.ignite.internal.processors.query.calcite.message.MessageType;
 +import org.apache.ignite.internal.processors.query.calcite.message.QueryStartRequest;
 +import org.apache.ignite.internal.processors.query.calcite.message.QueryStartResponse;
 +import org.apache.ignite.internal.processors.query.calcite.metadata.AffinityService;
 +import org.apache.ignite.internal.processors.query.calcite.metadata.FragmentDescription;
 +import org.apache.ignite.internal.processors.query.calcite.metadata.FragmentMapping;
 +import org.apache.ignite.internal.processors.query.calcite.metadata.MappingService;
 +import org.apache.ignite.internal.processors.query.calcite.metadata.RemoteException;
 +import org.apache.ignite.internal.processors.query.calcite.prepare.BaseQueryContext;
 +import org.apache.ignite.internal.processors.query.calcite.prepare.CacheKey;
 +import org.apache.ignite.internal.processors.query.calcite.prepare.DdlPlan;
 +import org.apache.ignite.internal.processors.query.calcite.prepare.ExplainPlan;
 +import org.apache.ignite.internal.processors.query.calcite.prepare.FieldsMetadata;
 +import org.apache.ignite.internal.processors.query.calcite.prepare.FieldsMetadataImpl;
 +import org.apache.ignite.internal.processors.query.calcite.prepare.Fragment;
 +import org.apache.ignite.internal.processors.query.calcite.prepare.FragmentPlan;
 +import org.apache.ignite.internal.processors.query.calcite.prepare.MappingQueryContext;
 +import org.apache.ignite.internal.processors.query.calcite.prepare.MultiStepPlan;
 +import org.apache.ignite.internal.processors.query.calcite.prepare.PlanningContext;
 +import org.apache.ignite.internal.processors.query.calcite.prepare.PrepareServiceImpl;
 +import org.apache.ignite.internal.processors.query.calcite.prepare.QueryPlan;
 +import org.apache.ignite.internal.processors.query.calcite.prepare.QueryPlanCache;
 +import org.apache.ignite.internal.processors.query.calcite.prepare.ddl.CreateTableCommand;
 +import org.apache.ignite.internal.processors.query.calcite.schema.SchemaHolder;
 +import org.apache.ignite.internal.processors.query.calcite.type.IgniteTypeFactory;
 +import org.apache.ignite.internal.processors.query.calcite.util.AbstractService;
 +import org.apache.ignite.internal.processors.query.calcite.util.Commons;
 +import org.apache.ignite.internal.processors.query.calcite.util.ListFieldsQueryCursor;
 +import org.apache.ignite.internal.processors.query.calcite.util.TypeUtils;
 +import org.apache.ignite.internal.util.typedef.F;
 +import org.apache.ignite.internal.util.typedef.X;
 +import org.apache.ignite.internal.util.typedef.internal.U;
 +import org.jetbrains.annotations.Nullable;
 +
 +import static java.util.Collections.singletonList;
 +import static org.apache.ignite.internal.processors.query.calcite.CalciteQueryProcessor.FRAMEWORK_CONFIG;
 +import static org.apache.ignite.internal.processors.query.calcite.externalize.RelJsonReader.fromJson;
 +
 +/**
 + *
 + */
 +@SuppressWarnings("TypeMayBeWeakened")
 +public class ExecutionServiceImpl<Row> extends AbstractService implements ExecutionService<Row> {
 +    /** */
 +    private final DiscoveryEventListener discoLsnr;
 +
 +    /** */
 +    private UUID locNodeId;
 +
 +    /** */
 +    private GridEventStorageManager evtMgr;
 +
 +    /** */
 +    private GridCachePartitionExchangeManager<?, ?> exchangeMgr;
 +
 +    /** */
 +    private QueryPlanCache qryPlanCache;
 +
 +    /** */
 +    private SchemaHolder schemaHolder;
 +
 +    /** */
 +    private QueryTaskExecutor taskExecutor;
 +
 +    /** */
 +    private FailureProcessor failureProcessor;
 +
 +    /** */
 +    private AffinityService partSvc;
 +
 +    /** */
 +    private MailboxRegistry mailboxRegistry;
 +
 +    /** */
 +    private MappingService mappingSvc;
 +
 +    /** */
 +    private MessageService msgSvc;
 +
 +    /** */
 +    private ExchangeService exchangeSvc;
 +
 +    /** */
 +    private PrepareServiceImpl prepareSvc;
 +
 +    /** */
 +    private ClosableIteratorsHolder iteratorsHolder;
 +
 +    /** */
 +    private QueryRegistry qryReg;
 +
 +    /** */
 +    private final RowHandler<Row> handler;
 +
 +    /** */
 +    private final DdlCommandHandler ddlCmdHnd;
 +
 +    /**
 +     * @param ctx Kernal.
 +     */
 +    public ExecutionServiceImpl(GridKernalContext ctx, RowHandler<Row> handler) {
 +        super(ctx);
 +        this.handler = handler;
 +
 +        discoLsnr = (e, c) -> onNodeLeft(e.eventNode().id());
 +
 +        ddlCmdHnd = new DdlCommandHandler(
 +            ctx::query, ctx.cache(), ctx.security(), () -> schemaHolder().schema(null)
 +        );
 +    }
 +
 +    /**
 +     * @param locNodeId Local node ID.
 +     */
 +    public void localNodeId(UUID locNodeId) {
 +        this.locNodeId = locNodeId;
 +    }
 +
 +    /**
 +     * @return Local node ID.
 +     */
 +    public UUID localNodeId() {
 +        return locNodeId;
 +    }
 +
 +    /**
 +     * @param qryPlanCache Query cache.
 +     */
 +    public void queryPlanCache(QueryPlanCache qryPlanCache) {
 +        this.qryPlanCache = qryPlanCache;
 +    }
 +
 +    /**
 +     * @return Query cache.
 +     */
 +    public QueryPlanCache queryPlanCache() {
 +        return qryPlanCache;
 +    }
 +
 +    /**
 +     * @param schemaHolder Schema holder.
 +     */
 +    public void schemaHolder(SchemaHolder schemaHolder) {
 +        this.schemaHolder = schemaHolder;
 +    }
 +
 +    /**
 +     * @return Schema holder.
 +     */
 +    public SchemaHolder schemaHolder() {
 +        return schemaHolder;
 +    }
 +
 +    /**
 +     * @param taskExecutor Task executor.
 +     */
 +    public void taskExecutor(QueryTaskExecutor taskExecutor) {
 +        this.taskExecutor = taskExecutor;
 +    }
 +
 +    /**
 +     * @return Task executor.
 +     */
 +    public QueryTaskExecutor taskExecutor() {
 +        return taskExecutor;
 +    }
 +
 +    /**
 +     * @param failureProcessor Failure processor.
 +     */
 +    public void failureProcessor(FailureProcessor failureProcessor) {
 +        this.failureProcessor = failureProcessor;
 +    }
 +
 +    /**
 +     * @return Failure processor.
 +     */
 +    public FailureProcessor failureProcessor() {
 +        return failureProcessor;
 +    }
 +
 +    /**
 +     * @param partSvc Partition service.
 +     */
 +    public void partitionService(AffinityService partSvc) {
 +        this.partSvc = partSvc;
 +    }
 +
 +    /**
 +     * @return Partition service.
 +     */
 +    public AffinityService partitionService() {
 +        return partSvc;
 +    }
 +
 +    /**
 +     * @param mailboxRegistry Mailbox registry.
 +     */
 +    public void mailboxRegistry(MailboxRegistry mailboxRegistry) {
 +        this.mailboxRegistry = mailboxRegistry;
 +    }
 +
 +    /**
 +     * @return Mailbox registry.
 +     */
 +    public MailboxRegistry mailboxRegistry() {
 +        return mailboxRegistry;
 +    }
 +
 +    /**
 +     * @param mappingSvc Mapping service.
 +     */
 +    public void mappingService(MappingService mappingSvc) {
 +        this.mappingSvc = mappingSvc;
 +    }
 +
 +    /**
 +     * @return Mapping service.
 +     */
 +    public MappingService mappingService() {
 +        return mappingSvc;
 +    }
 +
 +    /**
 +     * @param msgSvc Message service.
 +     */
 +    public void messageService(MessageService msgSvc) {
 +        this.msgSvc = msgSvc;
 +    }
 +
 +    /**
 +     * @return Message service.
 +     */
 +    public MessageService messageService() {
 +        return msgSvc;
 +    }
 +
 +    /**
 +     * @param exchangeSvc Exchange service.
 +     */
 +    public void exchangeService(ExchangeService exchangeSvc) {
 +        this.exchangeSvc = exchangeSvc;
 +    }
 +
 +    /**
 +     * @param prepareSvc Prepare service.
 +     */
 +    public void prepareService(PrepareServiceImpl prepareSvc) {
 +        this.prepareSvc = prepareSvc;
 +    }
 +
 +    /**
 +     * @return Exchange service.
 +     */
 +    public ExchangeService exchangeService() {
 +        return exchangeSvc;
 +    }
 +
 +    /**
 +     * @param evtMgr Event manager.
 +     */
 +    public void eventManager(GridEventStorageManager evtMgr) {
 +        this.evtMgr = evtMgr;
 +    }
 +
 +    /**
 +     * @return Event manager.
 +     */
 +    public GridEventStorageManager eventManager() {
 +        return evtMgr;
 +    }
 +
 +    /**
 +     * @param exchangeMgr Exchange manager.
 +     */
 +    public void exchangeManager(GridCachePartitionExchangeManager<?, ?> exchangeMgr) {
 +        this.exchangeMgr = exchangeMgr;
 +    }
 +
 +    /**
 +     * @return Exchange manager.
 +     */
 +    public GridCachePartitionExchangeManager<?, ?> exchangeManager() {
 +        return exchangeMgr;
 +    }
 +
 +    /**
 +     * @param iteratorsHolder Iterators holder.
 +     */
 +    public void iteratorsHolder(ClosableIteratorsHolder iteratorsHolder) {
 +        this.iteratorsHolder = iteratorsHolder;
 +    }
 +
 +    /**
 +     * @return Iterators holder.
 +     */
 +    public ClosableIteratorsHolder iteratorsHolder() {
 +        return iteratorsHolder;
 +    }
 +
 +    /** */
 +    public void queryRegistry(QueryRegistry qryReg) {
 +        this.qryReg = qryReg;
 +    }
 +
 +    /** {@inheritDoc} */
 +    @Override public void onStart(GridKernalContext ctx) {
 +        localNodeId(ctx.localNodeId());
 +        exchangeManager(ctx.cache().context().exchange());
 +        eventManager(ctx.event());
 +        iteratorsHolder(new ClosableIteratorsHolder(log));
 +
 +        CalciteQueryProcessor proc = Objects.requireNonNull(
 +            Commons.lookupComponent(ctx, CalciteQueryProcessor.class));
 +
 +        queryPlanCache(proc.queryPlanCache());
 +        schemaHolder(proc.schemaHolder());
 +        taskExecutor(proc.taskExecutor());
 +        failureProcessor(proc.failureProcessor());
 +        partitionService(proc.affinityService());
 +        mailboxRegistry(proc.mailboxRegistry());
 +        mappingService(proc.mappingService());
 +        messageService(proc.messageService());
 +        exchangeService(proc.exchangeService());
 +        queryRegistry(proc.queryRegistry());
 +        prepareService(proc.prepareService());
 +
 +        init();
-      }
++    }
 +
 +    /** {@inheritDoc} */
 +    @Override public void init() {
 +        messageService().register((n, m) -> onMessage(n, (QueryStartRequest)m), MessageType.QUERY_START_REQUEST);
 +        messageService().register((n, m) -> onMessage(n, (QueryStartResponse)m), MessageType.QUERY_START_RESPONSE);
 +        messageService().register((n, m) -> onMessage(n, (ErrorMessage)m), MessageType.QUERY_ERROR_MESSAGE);
 +
 +        eventManager().addDiscoveryEventListener(discoLsnr, EventType.EVT_NODE_FAILED, EventType.EVT_NODE_LEFT);
 +
 +        iteratorsHolder().init();
 +    }
 +
 +    /** {@inheritDoc} */
 +    @Override public void tearDown() {
 +        eventManager().removeDiscoveryEventListener(discoLsnr, EventType.EVT_NODE_FAILED, EventType.EVT_NODE_LEFT);
 +
 +        iteratorsHolder().tearDown();
 +    }
 +
 +    /** */
 +    protected AffinityTopologyVersion topologyVersion() {
 +        return exchangeManager().readyAffinityVersion();
 +    }
 +
 +    /** */
 +    private BaseQueryContext createQueryContext(Context parent, @Nullable String schema) {
 +        return BaseQueryContext.builder()
 +            .parentContext(parent)
 +            .frameworkConfig(
 +                Frameworks.newConfigBuilder(FRAMEWORK_CONFIG)
 +                    .defaultSchema(schemaHolder().schema(schema))
 +                    .build()
 +            )
 +            .logger(log)
 +            .build();
 +    }
 +
 +    /** */
 +    private QueryPlan prepareFragment(BaseQueryContext ctx, String jsonFragment) {
 +        return new FragmentPlan(fromJson(ctx, jsonFragment));
 +    }
 +
 +    /** {@inheritDoc} */
 +    @Override public FieldsQueryCursor<List<?>> executePlan(
 +        RootQuery<Row> qry,
 +        QueryPlan plan
 +    ) {
 +        switch (plan.type()) {
 +            case DML:
 +                ListFieldsQueryCursor<?> cur = mapAndExecutePlan(
 +                    qry,
 +                    (MultiStepPlan)plan
 +                );
 +
 +                cur.iterator().hasNext();
 +
 +                return cur;
 +
 +            case QUERY:
 +                return mapAndExecutePlan(
 +                    qry,
 +                    (MultiStepPlan)plan
 +                );
 +
 +            case EXPLAIN:
 +                return executeExplain(qry, (ExplainPlan)plan);
 +
 +            case DDL:
 +                return executeDdl(qry, (DdlPlan)plan);
 +
 +            default:
 +                throw new AssertionError("Unexpected plan type: " + plan);
 +        }
 +    }
 +
 +    /** */
 +    private FieldsQueryCursor<List<?>> executeDdl(RootQuery<Row> qry, DdlPlan plan) {
 +        try {
 +            ddlCmdHnd.handle(qry.id(), plan.command());
 +        }
 +        catch (IgniteCheckedException e) {
 +            throw new IgniteSQLException("Failed to execute DDL statement [stmt=" + qry.sql() +
 +                ", err=" + e.getMessage() + ']', e);
 +        }
 +        finally {
 +            qryReg.unregister(qry.id());
 +        }
 +
 +        if (plan.command() instanceof CreateTableCommand
 +            && ((CreateTableCommand)plan.command()).insertStatement() != null) {
 +            RootQuery<Row> insQry = qry.childQuery(schemaHolder.schema(qry.context().schemaName()));
 +
 +            qryReg.register(insQry);
 +
 +            SqlInsert insertStmt = ((CreateTableCommand)plan.command()).insertStatement();
 +
 +            QueryPlan dmlPlan = prepareSvc.prepareSingle(insertStmt, insQry.planningContext());
 +
 +            return executePlan(insQry, dmlPlan);
 +        }
 +        else {
 +            QueryCursorImpl<List<?>> resCur = new QueryCursorImpl<>(Collections.singletonList(
 +                Collections.singletonList(0L)), null, false, false);
 +
 +            IgniteTypeFactory typeFactory = qry.context().typeFactory();
 +
 +            resCur.fieldsMeta(new FieldsMetadataImpl(RelOptUtil.createDmlRowType(
 +                SqlKind.INSERT, typeFactory), null).queryFieldsMetadata(typeFactory));
 +
 +            return resCur;
 +        }
 +    }
 +
 +    /** */
 +    private ListFieldsQueryCursor<?> mapAndExecutePlan(
 +        RootQuery<Row> qry,
 +        MultiStepPlan plan
 +    ) {
 +        qry.mapping();
 +
 +        MappingQueryContext mapCtx = Commons.mapContext(locNodeId, topologyVersion());
 +        plan.init(mappingSvc, mapCtx);
 +
 +        List<Fragment> fragments = plan.fragments();
 +
 +        // Local execution
 +        Fragment fragment = F.first(fragments);
 +
 +        if (U.assertionsEnabled()) {
 +            assert fragment != null;
 +
 +            FragmentMapping mapping = plan.mapping(fragment);
 +
 +            assert mapping != null;
 +
 +            List<UUID> nodes = mapping.nodeIds();
 +
 +            assert nodes != null && nodes.size() == 1 && F.first(nodes).equals(localNodeId());
 +        }
 +
 +        FragmentDescription fragmentDesc = new FragmentDescription(
 +            fragment.fragmentId(),
 +            plan.mapping(fragment),
 +            plan.target(fragment),
 +            plan.remotes(fragment));
 +
 +        ExecutionContext<Row> ectx = new ExecutionContext<>(
 +            qry.context(),
 +            taskExecutor(),
 +            qry.id(),
 +            locNodeId,
 +            locNodeId,
 +            mapCtx.topologyVersion(),
 +            fragmentDesc,
 +            handler,
 +            Commons.parametersMap(qry.parameters()));
 +
 +        Node<Row> node = new LogicalRelImplementor<>(ectx, partitionService(), mailboxRegistry(),
 +            exchangeService(), failureProcessor()).go(fragment.root());
 +
 +        qry.run(ectx, plan, node);
 +
 +        Map<UUID, Long> fragmentsPerNode = fragments.stream()
 +            .skip(1)
 +            .flatMap(f -> f.mapping().nodeIds().stream())
 +            .collect(Collectors.groupingBy(Function.identity(), Collectors.counting()));
 +
 +        // Start remote execution.
 +        for (int i = 1; i < fragments.size(); i++) {
 +            fragment = fragments.get(i);
 +            fragmentDesc = new FragmentDescription(
 +                fragment.fragmentId(),
 +                plan.mapping(fragment),
 +                plan.target(fragment),
 +                plan.remotes(fragment));
 +
 +            Throwable ex = null;
 +            byte[] parametersMarshalled = null;
 +
 +            for (UUID nodeId : fragmentDesc.nodeIds()) {
 +                if (ex != null)
 +                    qry.onResponse(nodeId, fragment.fragmentId(), ex);
 +                else {
 +                    try {
 +                        QueryStartRequest req = new QueryStartRequest(
 +                            qry.id(),
 +                            qry.context().schemaName(),
 +                            fragment.serialized(),
 +                            ectx.topologyVersion(),
 +                            fragmentDesc,
 +                            fragmentsPerNode.get(nodeId).intValue(),
 +                            qry.parameters(),
 +                            parametersMarshalled
 +                        );
 +
 +                        messageService().send(nodeId, req);
 +
 +                        // Avoid marshaling of the same parameters for other nodes.
 +                        if (parametersMarshalled == null)
 +                            parametersMarshalled = req.parametersMarshalled();
 +                    }
 +                    catch (Throwable e) {
 +                        qry.onResponse(nodeId, fragment.fragmentId(), ex = e);
 +                    }
 +                }
 +            }
 +        }
 +
 +        return new ListFieldsQueryCursor<>(plan, iteratorsHolder().iterator(qry.iterator()), ectx);
 +    }
 +
 +    /** */
 +    private FieldsQueryCursor<List<?>> executeExplain(RootQuery<Row> qry, ExplainPlan plan) {
 +        QueryCursorImpl<List<?>> cur = new QueryCursorImpl<>(singletonList(singletonList(plan.plan())));
 +        cur.fieldsMeta(plan.fieldsMeta().queryFieldsMetadata(Commons.typeFactory()));
 +
 +        qryReg.unregister(qry.id());
 +
 +        return cur;
 +    }
 +
 +    /** */
 +    private void executeFragment(Query<Row> qry, FragmentPlan plan, ExecutionContext<Row> ectx) {
 +        UUID origNodeId = ectx.originatingNodeId();
 +
 +        Outbox<Row> node = new LogicalRelImplementor<>(
 +            ectx,
 +            partitionService(),
 +            mailboxRegistry(),
 +            exchangeService(),
 +            failureProcessor()
 +        )
 +            .go(plan.root());
 +
 +        qry.addFragment(new RunningFragment<>(plan.root(), node, ectx));
 +
 +        node.init();
 +
 +        if (!qry.isExchangeWithInitNodeStarted(ectx.fragmentId())) {
 +            try {
 +                messageService().send(origNodeId, new QueryStartResponse(qry.id(), ectx.fragmentId()));
 +            }
 +            catch (IgniteCheckedException e) {
 +                IgniteException wrpEx = new IgniteException("Failed to send reply. [nodeId=" + origNodeId + ']', e);
 +
 +                throw wrpEx;
 +            }
 +        }
 +    }
 +
 +    /** */
 +    private FieldsMetadata queryFieldsMetadata(PlanningContext ctx, RelDataType sqlType,
 +        @Nullable List<List<String>> origins) {
 +        RelDataType resultType = TypeUtils.getResultType(
 +            ctx.typeFactory(), ctx.catalogReader(), sqlType, origins);
 +        return new FieldsMetadataImpl(resultType, origins);
 +    }
 +
 +    /** */
 +    private void onMessage(UUID nodeId, final QueryStartRequest msg) {
 +        assert nodeId != null && msg != null;
 +
 +        try {
 +            Query<Row> qry = (Query<Row>)qryReg.register(
 +                new Query<>(
 +                    msg.queryId(),
 +                    nodeId,
 +                    null,
 +                    exchangeSvc,
 +                    (q) -> qryReg.unregister(q.id()),
 +                    log,
 +                    msg.totalFragmentsCount()
 +                )
 +            );
 +
 +            final BaseQueryContext qctx = createQueryContext(Contexts.empty(), msg.schema());
 +
 +            QueryPlan qryPlan = queryPlanCache().queryPlan(
 +                new CacheKey(msg.schema(), msg.root()),
 +                () -> prepareFragment(qctx, msg.root())
 +            );
 +
 +            assert qryPlan.type() == QueryPlan.Type.FRAGMENT;
 +
 +            ExecutionContext<Row> ectx = new ExecutionContext<>(
 +                qctx,
 +                taskExecutor(),
 +                msg.queryId(),
 +                locNodeId,
 +                nodeId,
 +                msg.topologyVersion(),
 +                msg.fragmentDescription(),
 +                handler,
 +                Commons.parametersMap(msg.parameters())
 +            );
 +
 +            executeFragment(qry, (FragmentPlan)qryPlan, ectx);
 +        }
 +        catch (Throwable ex) {
 +            U.error(log, "Failed to start query fragment ", ex);
 +
 +            mailboxRegistry.outboxes(msg.queryId(), msg.fragmentId(), -1)
 +                .forEach(Outbox::close);
 +            mailboxRegistry.inboxes(msg.queryId(), msg.fragmentId(), -1)
 +                .forEach(Inbox::close);
 +
 +            try {
 +                messageService().send(nodeId, new QueryStartResponse(msg.queryId(), msg.fragmentId(), ex));
 +            }
 +            catch (IgniteCheckedException e) {
 +                U.error(log, "Error occurred during send error message: " + X.getFullStackTrace(e));
 +
 +                IgniteException wrpEx = new IgniteException("Error occurred during send error message", e);
 +
 +                e.addSuppressed(ex);
 +
 +                Query<Row> qry = (Query<Row>)qryReg.query(msg.queryId());
 +
 +                qry.cancel();
 +
 +                throw wrpEx;
 +            }
 +
 +            throw ex;
 +        }
 +    }
 +
 +    /** */
 +    private void onMessage(UUID nodeId, QueryStartResponse msg) {
 +        assert nodeId != null && msg != null;
 +
 +        RunningQuery qry = qryReg.query(msg.queryId());
 +
 +        if (qry != null) {
 +            assert qry instanceof RootQuery : "Unexpected query object: " + qry;
 +
 +            ((RootQuery<Row>)qry).onResponse(nodeId, msg.fragmentId(), msg.error());
 +        }
 +    }
 +
 +    /** */
 +    private void onMessage(UUID nodeId, ErrorMessage msg) {
 +        assert nodeId != null && msg != null;
 +
 +        RunningQuery qry = qryReg.query(msg.queryId());
 +
 +        if (qry != null && qry.state() != QueryState.CLOSED) {
 +            assert qry instanceof RootQuery : "Unexpected query object: " + qry;
 +
 +            Exception e = new RemoteException(nodeId, msg.queryId(), msg.fragmentId(), msg.error());
 +
 +            if (X.hasCause(msg.error(), ExecutionCancelledException.class)) {
 +                e = new IgniteSQLException(
 +                    "The query was cancelled while executing.",
 +                    IgniteQueryErrorCode.QUERY_CANCELED,
 +                    e
 +                );
 +            }
 +
 +            ((RootQuery<Row>)qry).onError(e);
 +        }
 +    }
 +
 +    /** */
 +    private void onNodeLeft(UUID nodeId) {
 +        qryReg.runningQueries()
 +            .forEach((qry) -> ((Query<Row>)qry).onNodeLeft(nodeId));
 +    }
 +}
diff --cc modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/exp/ConverterUtils.java
index b9e0f89,0000000..671fb42
mode 100644,000000..100644
--- a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/exp/ConverterUtils.java
+++ b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/exp/ConverterUtils.java
@@@ -1,458 -1,0 +1,458 @@@
 +/*
 + * Licensed to the Apache Software Foundation (ASF) under one or more
 + * contributor license agreements.  See the NOTICE file distributed with
 + * this work for additional information regarding copyright ownership.
 + * The ASF licenses this file to you under the Apache License, Version 2.0
 + * (the "License"); you may not use this file except in compliance with
 + * the License.  You may obtain a copy of the License at
 + *
 + * http://www.apache.org/licenses/LICENSE-2.0
 + *
 + * Unless required by applicable law or agreed to in writing, software
 + * distributed under the License is distributed on an "AS IS" BASIS,
 + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 + * See the License for the specific language governing permissions and
 + * limitations under the License.
 + */
 +package org.apache.ignite.internal.processors.query.calcite.exec.exp;
 +
 +import java.lang.reflect.Type;
 +import java.math.BigDecimal;
 +import java.util.ArrayList;
 +import java.util.List;
 +import java.util.UUID;
 +
 +import org.apache.calcite.adapter.enumerable.RexImpTable;
 +import org.apache.calcite.linq4j.tree.ConstantExpression;
 +import org.apache.calcite.linq4j.tree.ConstantUntypedNull;
 +import org.apache.calcite.linq4j.tree.Expression;
 +import org.apache.calcite.linq4j.tree.ExpressionType;
 +import org.apache.calcite.linq4j.tree.Expressions;
 +import org.apache.calcite.linq4j.tree.Primitive;
 +import org.apache.calcite.linq4j.tree.Types;
 +import org.apache.calcite.linq4j.tree.UnaryExpression;
 +import org.apache.calcite.rel.type.RelDataType;
 +import org.apache.calcite.rex.RexNode;
 +import org.apache.calcite.runtime.SqlFunctions;
 +import org.apache.calcite.sql.type.SqlTypeName;
 +import org.apache.calcite.util.BuiltInMethod;
 +import org.apache.calcite.util.Util;
 +
 +/** */
 +public class ConverterUtils {
 +    /** */
 +    private ConverterUtils() {
 +    }
 +
 +    /**
 +     * In Calcite, {@code java.sql.Date} and {@code java.sql.Time} are stored as {@code Integer} type, {@code
 +     * java.sql.Timestamp} is stored as {@code Long} type.
 +     */
 +    static Expression toInternal(Expression operand, Type targetType) {
 +        return toInternal(operand, operand.getType(), targetType);
 +    }
 +
 +    /** */
 +    private static Expression toInternal(Expression operand,
 +        Type fromType, Type targetType) {
 +        if (fromType == java.sql.Date.class) {
 +            if (targetType == int.class)
-               return Expressions.call(BuiltInMethod.DATE_TO_INT.method, operand);
++                return Expressions.call(BuiltInMethod.DATE_TO_INT.method, operand);
 +            else if (targetType == Integer.class)
-               return Expressions.call(BuiltInMethod.DATE_TO_INT_OPTIONAL.method, operand);
++                return Expressions.call(BuiltInMethod.DATE_TO_INT_OPTIONAL.method, operand);
 +        }
 +        else if (fromType == java.sql.Time.class) {
 +            if (targetType == int.class)
-               return Expressions.call(BuiltInMethod.TIME_TO_INT.method, operand);
++                return Expressions.call(BuiltInMethod.TIME_TO_INT.method, operand);
 +            else if (targetType == Integer.class)
-               return Expressions.call(BuiltInMethod.TIME_TO_INT_OPTIONAL.method, operand);
++                return Expressions.call(BuiltInMethod.TIME_TO_INT_OPTIONAL.method, operand);
 +        }
 +        else if (fromType == java.sql.Timestamp.class) {
 +            if (targetType == long.class)
-               return Expressions.call(BuiltInMethod.TIMESTAMP_TO_LONG.method, operand);
++                return Expressions.call(BuiltInMethod.TIMESTAMP_TO_LONG.method, operand);
 +            else if (targetType == Long.class)
-               return Expressions.call(BuiltInMethod.TIMESTAMP_TO_LONG_OPTIONAL.method, operand);
++                return Expressions.call(BuiltInMethod.TIMESTAMP_TO_LONG_OPTIONAL.method, operand);
 +        }
 +        return operand;
 +    }
 +
 +    /** Converts from internal representation to JDBC representation used by
 +     * arguments of user-defined functions. For example, converts date values from
 +     * {@code int} to {@link java.sql.Date}. */
 +    private static Expression fromInternal(Expression operand, Type targetType) {
 +        return fromInternal(operand, operand.getType(), targetType);
 +    }
 +
 +    /** */
 +    private static Expression fromInternal(Expression operand,
 +        Type fromType, Type targetType) {
 +        if (operand == ConstantUntypedNull.INSTANCE)
-           return operand;
++            return operand;
 +        if (!(operand.getType() instanceof Class))
-           return operand;
++            return operand;
 +        if (Types.isAssignableFrom(targetType, fromType))
-           return operand;
++            return operand;
 +        if (targetType == java.sql.Date.class) {
 +            // E.g. from "int" or "Integer" to "java.sql.Date",
 +            // generate "SqlFunctions.internalToDate".
 +            if (isA(fromType, Primitive.INT))
-               return Expressions.call(BuiltInMethod.INTERNAL_TO_DATE.method, operand);
++                return Expressions.call(BuiltInMethod.INTERNAL_TO_DATE.method, operand);
 +        }
 +        else if (targetType == java.sql.Time.class) {
 +            // E.g. from "int" or "Integer" to "java.sql.Time",
 +            // generate "SqlFunctions.internalToTime".
 +            if (isA(fromType, Primitive.INT))
-               return Expressions.call(BuiltInMethod.INTERNAL_TO_TIME.method, operand);
++                return Expressions.call(BuiltInMethod.INTERNAL_TO_TIME.method, operand);
 +        }
 +        else if (targetType == java.sql.Timestamp.class) {
 +            // E.g. from "long" or "Long" to "java.sql.Timestamp",
 +            // generate "SqlFunctions.internalToTimestamp".
 +            if (isA(fromType, Primitive.LONG))
-               return Expressions.call(BuiltInMethod.INTERNAL_TO_TIMESTAMP.method, operand);
++                return Expressions.call(BuiltInMethod.INTERNAL_TO_TIMESTAMP.method, operand);
 +        }
 +        if (Primitive.is(operand.type)
 +            && Primitive.isBox(targetType)) {
 +            // E.g. operand is "int", target is "Long", generate "(long) operand".
 +            return Expressions.convert_(operand,
 +                Primitive.ofBox(targetType).primitiveClass);
 +        }
 +        return operand;
 +    }
 +
 +    /** */
 +    static List<Expression> fromInternal(Class<?>[] targetTypes,
 +        List<Expression> expressions) {
 +        final List<Expression> list = new ArrayList<>();
 +        if (targetTypes.length == expressions.size()) {
 +            for (int i = 0; i < expressions.size(); i++)
 +                list.add(fromInternal(expressions.get(i), targetTypes[i]));
 +        } else {
 +            int j = 0;
 +            for (int i = 0; i < expressions.size(); i++) {
 +                Class<?> type;
 +                if (!targetTypes[j].isArray()) {
 +                    type = targetTypes[j];
 +                    j++;
 +                } else
 +                    type = targetTypes[j].getComponentType();
 +
 +                list.add(fromInternal(expressions.get(i), type));
 +            }
 +        }
 +        return list;
 +    }
 +
 +    /** */
 +    private static Type toInternal(RelDataType type) {
 +        return toInternal(type, false);
 +    }
 +
 +    /** */
 +    static Type toInternal(RelDataType type, boolean forceNotNull) {
 +        switch (type.getSqlTypeName()) {
 +            case DATE:
 +            case TIME:
 +                return type.isNullable() && !forceNotNull ? Integer.class : int.class;
 +            case TIMESTAMP:
 +                return type.isNullable() && !forceNotNull ? Long.class : long.class;
 +            default:
 +                return null; // we don't care; use the default storage type
 +        }
 +    }
 +
 +    /** */
 +    static List<Type> internalTypes(List<? extends RexNode> operandList) {
 +        return Util.transform(operandList, node -> toInternal(node.getType()));
 +    }
 +
 +    /**
 +     * Convert {@code operand} to target type {@code toType}.
 +     *
 +     * @param operand The expression to convert
 +     * @param toType Target type
 +     * @return A new expression with type {@code toType} or original if there is no need to convert
 +     */
 +    public static Expression convert(Expression operand, Type toType) {
 +        final Type fromType = operand.getType();
 +        return convert(operand, fromType, toType);
 +    }
 +
 +    /**
 +     * Convert {@code operand} from {@code fromType} to {@code targetType} which is BigDecimal type.
 +     *
 +     * @param operand The expression to convert
 +     * @param targetType Target type
 +     * @return An expression with BidDecimal type, which calls IgniteSqlFunctions.toBigDecimal function.
 +     */
 +    public static Expression convertToDecimal(Expression operand, RelDataType targetType) {
 +        assert targetType.getSqlTypeName() == SqlTypeName.DECIMAL;
 +        return Expressions.call(
 +                IgniteSqlFunctions.class,
 +                "toBigDecimal",
 +                operand,
 +                Expressions.constant(targetType.getPrecision()),
 +                Expressions.constant(targetType.getScale()));
 +    }
 +
 +    /**
 +     * Convert {@code operand} to target type {@code toType}.
 +     *
 +     * @param operand The expression to convert
 +     * @param fromType Field type
 +     * @param toType Target type
 +     * @return A new expression with type {@code toType} or original if there is no need to convert
 +     */
 +    public static Expression convert(Expression operand, Type fromType, Type toType) {
 +        if (!Types.needTypeCast(fromType, toType))
-           return operand;
++            return operand;
 +
 +        if (toType == Void.class)
 +            return RexImpTable.NULL_EXPR;
 +
 +        if (toType == BigDecimal.class)
 +            throw new AssertionError("For conversion to decimal, ConverterUtils#convertToDecimal method should be used instead.");
 +
 +        // E.g. from "Short" to "int".
 +        // Generate "x.intValue()".
 +        final Primitive toPrimitive = Primitive.of(toType);
 +        final Primitive toBox = Primitive.ofBox(toType);
 +        final Primitive fromBox = Primitive.ofBox(fromType);
 +        final Primitive fromPrimitive = Primitive.of(fromType);
 +        final boolean fromNumber = fromType instanceof Class
 +            && Number.class.isAssignableFrom((Class)fromType);
 +        if (fromType == String.class) {
 +            if (toPrimitive != null) {
 +                switch (toPrimitive) {
 +                    case CHAR:
 +                    case SHORT:
 +                    case INT:
 +                    case LONG:
 +                    case FLOAT:
 +                    case DOUBLE:
 +                        // Generate "SqlFunctions.toShort(x)".
 +                        return Expressions.call(
 +                            SqlFunctions.class,
 +                            "to" + SqlFunctions.initcap(toPrimitive.primitiveName),
 +                            operand);
 +                    default:
 +                        // Generate "Short.parseShort(x)".
 +                        return Expressions.call(
 +                            toPrimitive.boxClass,
 +                            "parse" + SqlFunctions.initcap(toPrimitive.primitiveName),
 +                            operand);
 +                }
 +            }
 +            if (toBox != null) {
 +                switch (toBox) {
 +                    case CHAR:
 +                        // Generate "SqlFunctions.toCharBoxed(x)".
 +                        return Expressions.call(
 +                            SqlFunctions.class,
 +                            "to" + SqlFunctions.initcap(toBox.primitiveName) + "Boxed",
 +                            operand);
 +                    default:
 +                        // Generate "Short.valueOf(x)".
 +                        return Expressions.call(
 +                            toBox.boxClass,
 +                            "valueOf",
 +                            operand);
 +                }
 +            }
 +        }
 +        if (toPrimitive != null) {
 +            if (fromPrimitive != null) {
 +                // E.g. from "float" to "double"
 +                return Expressions.convert_(
 +                    operand, toPrimitive.primitiveClass);
 +            }
 +            if (fromNumber || fromBox == Primitive.CHAR) {
 +                // Generate "x.shortValue()".
 +                return Expressions.unbox(operand, toPrimitive);
 +            }
 +            else {
 +                // E.g. from "Object" to "short".
 +                // Generate "SqlFunctions.toShort(x)"
 +                return Expressions.call(
 +                    SqlFunctions.class,
 +                    "to" + SqlFunctions.initcap(toPrimitive.primitiveName),
 +                    operand);
 +            }
 +        }
 +        else if (fromNumber && toBox != null) {
 +            // E.g. from "Short" to "Integer"
 +            // Generate "x == null ? null : Integer.valueOf(x.intValue())"
 +            return Expressions.condition(
 +                Expressions.equal(operand, RexImpTable.NULL_EXPR),
 +                RexImpTable.NULL_EXPR,
 +                Expressions.box(
 +                    Expressions.unbox(operand, toBox),
 +                    toBox));
 +        }
 +        else if (fromPrimitive != null && toBox != null) {
 +            // E.g. from "int" to "Long".
 +            // Generate Long.valueOf(x)
 +            // Eliminate primitive casts like Long.valueOf((long) x)
 +            if (operand instanceof UnaryExpression) {
 +                UnaryExpression una = (UnaryExpression)operand;
 +                if (una.nodeType == ExpressionType.Convert
 +                    && Primitive.of(una.getType()) == toBox) {
 +                    Primitive origin = Primitive.of(una.expression.type);
 +                    if (origin != null && toBox.assignableFrom(origin))
-                       return Expressions.box(una.expression, toBox);
++                        return Expressions.box(una.expression, toBox);
 +                }
 +            }
 +            if (fromType == toBox.primitiveClass)
-               return Expressions.box(operand, toBox);
++                return Expressions.box(operand, toBox);
 +            // E.g., from "int" to "Byte".
 +            // Convert it first and generate "Byte.valueOf((byte)x)"
 +            // Because there is no method "Byte.valueOf(int)" in Byte
 +            return Expressions.box(
 +                Expressions.convert_(operand, toBox.primitiveClass),
 +                toBox);
 +        }
 +        // Convert datetime types to internal storage type:
 +        // 1. java.sql.Date -> int or Integer
 +        // 2. java.sql.Time -> int or Integer
 +        // 3. java.sql.Timestamp -> long or Long
 +        if (representAsInternalType(fromType)) {
 +            final Expression internalTypedOperand =
 +                toInternal(operand, fromType, toType);
 +            if (operand != internalTypedOperand)
-               return internalTypedOperand;
++                return internalTypedOperand;
 +        }
 +        // Convert internal storage type to datetime types:
 +        // 1. int or Integer -> java.sql.Date
 +        // 2. int or Integer -> java.sql.Time
 +        // 3. long or Long -> java.sql.Timestamp
 +        if (representAsInternalType(toType)) {
 +            final Expression originTypedOperand =
 +                fromInternal(operand, fromType, toType);
 +            if (operand != originTypedOperand)
-               return originTypedOperand;
++                return originTypedOperand;
 +        }
 +        else if (toType == String.class) {
 +            if (fromPrimitive != null) {
 +                switch (fromPrimitive) {
 +                    case DOUBLE:
 +                    case FLOAT:
 +                        // E.g. from "double" to "String"
 +                        // Generate "SqlFunctions.toString(x)"
 +                        return Expressions.call(
 +                            SqlFunctions.class,
 +                            "toString",
 +                            operand);
 +                    default:
 +                        // E.g. from "int" to "String"
 +                        // Generate "Integer.toString(x)"
 +                        return Expressions.call(
 +                            fromPrimitive.boxClass,
 +                            "toString",
 +                            operand);
 +                }
 +            }
 +            else if (fromType == BigDecimal.class) {
 +                // E.g. from "BigDecimal" to "String"
 +                // Generate "SqlFunctions.toString(x)"
 +                return Expressions.condition(
 +                    Expressions.equal(operand, RexImpTable.NULL_EXPR),
 +                    RexImpTable.NULL_EXPR,
 +                    Expressions.call(
 +                        IgniteSqlFunctions.class,
 +                        "toString",
 +                        operand));
 +            }
 +            else {
 +                Expression result;
 +                try {
 +                    // Avoid to generate code like:
 +                    // "null.toString()" or "(xxx) null.toString()"
 +                    if (operand instanceof ConstantExpression) {
 +                        ConstantExpression ce = (ConstantExpression)operand;
 +                        if (ce.value == null)
-                           return Expressions.convert_(operand, toType);
++                            return Expressions.convert_(operand, toType);
 +                    }
 +                    // Try to call "toString()" method
 +                    // E.g. from "Integer" to "String"
 +                    // Generate "x == null ? null : x.toString()"
 +                    result = Expressions.condition(
 +                        Expressions.equal(operand, RexImpTable.NULL_EXPR),
 +                        RexImpTable.NULL_EXPR,
 +                        Expressions.call(operand, "toString"));
 +                }
 +                catch (RuntimeException e) {
 +                    // For some special cases, e.g., "BuiltInMethod.LESSER",
 +                    // its return type is generic ("Comparable"), which contains
 +                    // no "toString()" method. We fall through to "(String)x".
 +                    return Expressions.convert_(operand, toType);
 +                }
 +                return result;
 +            }
 +        }
 +        else if (toType == UUID.class && fromType == String.class)
 +            return Expressions.call(UUID.class, "fromString", operand);
 +
 +        return Expressions.convert_(operand, toType);
 +    }
 +
 +    /** */
 +    private static boolean isA(Type fromType, Primitive primitive) {
 +        return Primitive.of(fromType) == primitive
 +            || Primitive.ofBox(fromType) == primitive;
 +    }
 +
 +    /** */
 +    private static boolean representAsInternalType(Type type) {
 +        return type == java.sql.Date.class
 +            || type == java.sql.Time.class
 +            || type == java.sql.Timestamp.class;
 +    }
 +
 +    /**
 +     * In {@link org.apache.calcite.sql.type.SqlTypeAssignmentRule},
 +     * some rules decide whether one type can be assignable to another type.
 +     * Based on these rules, a function can accept arguments with assignable types.
 +     *
 +     * <p>For example, a function with Long type operand can accept Integer as input.
 +     * See {@code org.apache.calcite.sql.SqlUtil#filterRoutinesByParameterType()} for details.
 +     *
 +     * <p>During query execution, some of the assignable types need explicit conversion
 +     * to the target types. i.e., Decimal expression should be converted to Integer
 +     * before it is assigned to the Integer type Lvalue(In Java, Decimal can not be assigned to
 +     * Integer directly).
 +     *
 +     * @param targetTypes Formal operand types declared for the function arguments
 +     * @param arguments Input expressions to the function
 +     * @return Input expressions with probable type conversion
 +     */
 +    static List<Expression> convertAssignableTypes(Class<?>[] targetTypes,
 +        List<Expression> arguments) {
 +        final List<Expression> list = new ArrayList<>();
 +        if (targetTypes.length == arguments.size()) {
 +            for (int i = 0; i < arguments.size(); i++)
 +                list.add(convertAssignableType(arguments.get(i), targetTypes[i]));
 +        } else {
 +            int j = 0;
 +            for (Expression argument: arguments) {
 +                Class<?> type;
 +                if (!targetTypes[j].isArray()) {
 +                    type = targetTypes[j];
 +                    j++;
 +                } else
 +                    type = targetTypes[j].getComponentType();
 +
 +                list.add(convertAssignableType(argument, type));
 +            }
 +        }
 +        return list;
 +    }
 +
 +    /**
 +     * Handles decimal type specifically with explicit type conversion.
 +     */
 +    private static Expression convertAssignableType(Expression argument, Type targetType) {
 +        if (targetType != BigDecimal.class)
 +            return argument;
 +
 +        return convert(argument, targetType);
 +    }
 +}
diff --cc modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/exp/RexExecutorImpl.java
index ff37fef,0000000..b39d937
mode 100644,000000..100644
--- a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/exp/RexExecutorImpl.java
+++ b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/exp/RexExecutorImpl.java
@@@ -1,194 -1,0 +1,193 @@@
 +/*
 + * Licensed to the Apache Software Foundation (ASF) under one or more
 + * contributor license agreements.  See the NOTICE file distributed with
 + * this work for additional information regarding copyright ownership.
 + * The ASF licenses this file to you under the Apache License, Version 2.0
 + * (the "License"); you may not use this file except in compliance with
 + * the License.  You may obtain a copy of the License at
 + *
 + * http://www.apache.org/licenses/LICENSE-2.0
 + *
 + * Unless required by applicable law or agreed to in writing, software
 + * distributed under the License is distributed on an "AS IS" BASIS,
 + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 + * See the License for the specific language governing permissions and
 + * limitations under the License.
 + */
 +package org.apache.ignite.internal.processors.query.calcite.exec.exp;
 +
 +import java.lang.reflect.Modifier;
 +import java.lang.reflect.Type;
 +import java.util.List;
 +
 +import com.google.common.collect.ImmutableList;
 +import org.apache.calcite.DataContext;
 +import org.apache.calcite.adapter.java.JavaTypeFactory;
 +import org.apache.calcite.config.CalciteSystemProperty;
 +import org.apache.calcite.jdbc.JavaTypeFactoryImpl;
 +import org.apache.calcite.linq4j.tree.BlockBuilder;
 +import org.apache.calcite.linq4j.tree.Expression;
 +import org.apache.calcite.linq4j.tree.Expressions;
 +import org.apache.calcite.linq4j.tree.IndexExpression;
 +import org.apache.calcite.linq4j.tree.MethodCallExpression;
 +import org.apache.calcite.linq4j.tree.MethodDeclaration;
 +import org.apache.calcite.linq4j.tree.ParameterExpression;
 +import org.apache.calcite.rel.type.RelDataType;
 +import org.apache.calcite.rel.type.RelDataTypeFactory;
 +import org.apache.calcite.rex.RexBuilder;
 +import org.apache.calcite.rex.RexExecutable;
 +import org.apache.calcite.rex.RexExecutor;
 +import org.apache.calcite.rex.RexNode;
 +import org.apache.calcite.rex.RexProgram;
 +import org.apache.calcite.rex.RexProgramBuilder;
 +import org.apache.calcite.sql.validate.SqlConformance;
 +import org.apache.calcite.sql.validate.SqlConformanceEnum;
 +import org.apache.calcite.util.BuiltInMethod;
 +import org.apache.calcite.util.Util;
 +import org.apache.ignite.internal.processors.query.calcite.type.UuidType;
 +
 +/**
 + * Evaluates a {@link RexNode} expression.
 + *
 + * <p>For this impl, all the public methods should be
 + * static except that it inherits from {@link RexExecutor}.
 + * This pretends that other code in the project assumes
 + * the executor instance is {@link RexExecutorImpl}.
 +*/
 +public class RexExecutorImpl implements RexExecutor {
-   /** Data context. */
-   private final DataContext dataCtx;
++    /** Data context. */
++    private final DataContext dataCtx;
 +
-   /**
-    * @param dataCtx Data context.
-    */
-   public RexExecutorImpl(DataContext dataCtx) {
-     this.dataCtx = dataCtx;
-   }
- 
-   /** */
-   private static String compile(RexBuilder rexBuilder, List<RexNode> constExps,
-       RexToLixTranslator.InputGetter getter) {
-     final RelDataTypeFactory typeFactory = rexBuilder.getTypeFactory();
-     final RelDataType emptyRowType = typeFactory.builder().build();
-     return compile(rexBuilder, constExps, getter, emptyRowType);
-   }
- 
-   /** */
-   private static String compile(RexBuilder rexBuilder, List<RexNode> constExps,
-       RexToLixTranslator.InputGetter getter, RelDataType rowType) {
-     final RexProgramBuilder programBuilder =
-         new RexProgramBuilder(rowType, rexBuilder);
-     for (RexNode node : constExps) {
-       programBuilder.addProject(
-           node, "c" + programBuilder.getProjectList().size());
++    /**
++     * @param dataCtx Data context.
++     */
++    public RexExecutorImpl(DataContext dataCtx) {
++        this.dataCtx = dataCtx;
 +    }
-     final JavaTypeFactoryImpl javaTypeFactory =
-         new JavaTypeFactoryImpl(rexBuilder.getTypeFactory().getTypeSystem());
-     final BlockBuilder blockBuilder = new BlockBuilder();
-     final ParameterExpression root0_ =
-         Expressions.parameter(Object.class, "root0");
-     final ParameterExpression root_ = DataContext.ROOT;
-     blockBuilder.add(
-         Expressions.declare(
-             Modifier.FINAL, root_,
-             Expressions.convert_(root0_, DataContext.class)));
-     final SqlConformance conformance = SqlConformanceEnum.DEFAULT;
-     final RexProgram program = programBuilder.getProgram();
-     final List<Expression> expressions =
-         RexToLixTranslator.translateProjects(program, javaTypeFactory,
-             conformance, blockBuilder, null, root_, getter, null);
-     blockBuilder.add(
-         Expressions.return_(null,
-             Expressions.newArrayInit(Object[].class, expressions)));
-     final MethodDeclaration mtdDecl =
-         Expressions.methodDecl(Modifier.PUBLIC, Object[].class,
-             BuiltInMethod.FUNCTION1_APPLY.method.getName(),
-             ImmutableList.of(root0_), blockBuilder.toBlock());
-     String code = Expressions.toString(mtdDecl);
-     if (CalciteSystemProperty.DEBUG.value())
-       Util.debugCode(System.out, code);
-     return code;
-   }
 +
-   /**
-    * Creates an {@link RexExecutable} that allows to apply the
-    * generated code during query processing (filter, projection).
-    *
-    * @param rexBuilder Rex builder
-    * @param exps Expressions
-    * @param rowType describes the structure of the input row.
-    */
-   public static RexExecutable getExecutable(RexBuilder rexBuilder, List<RexNode> exps,
-       RelDataType rowType) {
-     final JavaTypeFactoryImpl typeFactory =
-         new JavaTypeFactoryImpl(rexBuilder.getTypeFactory().getTypeSystem());
-     final RexToLixTranslator.InputGetter getter = new DataContextInputGetter(rowType, typeFactory);
-     final String code = compile(rexBuilder, exps, getter, rowType);
-     return new RexExecutable(code, "generated Rex code");
-   }
- 
-   /**
-    * Do constant reduction using generated code.
-    */
-   @Override public void reduce(RexBuilder rexBuilder, List<RexNode> constExps,
-       List<RexNode> reducedValues) {
-     for (RexNode node : constExps) {
-       // Do not simplify UUID types, since we can't convert it to literal of this type.
-       if (node.getType() instanceof UuidType) {
-         reducedValues.addAll(constExps);
-         return;
-       }
++    /** */
++    private static String compile(RexBuilder rexBuilder, List<RexNode> constExps, RexToLixTranslator.InputGetter getter) {
++        final RelDataTypeFactory typeFactory = rexBuilder.getTypeFactory();
++        final RelDataType emptyRowType = typeFactory.builder().build();
++        return compile(rexBuilder, constExps, getter, emptyRowType);
 +    }
-     final String code = compile(rexBuilder, constExps,
-         (list, index, storageType) -> {
-           throw new UnsupportedOperationException();
-         });
 +
-     final RexExecutable executable = new RexExecutable(code, constExps);
-     executable.setDataContext(dataCtx);
-     executable.reduce(rexBuilder, constExps, reducedValues);
-   }
++    /** */
++    private static String compile(
++        RexBuilder rexBuilder,
++        List<RexNode> constExps,
++        RexToLixTranslator.InputGetter getter,
++        RelDataType rowType
++    ) {
++        final RexProgramBuilder programBuilder =
++            new RexProgramBuilder(rowType, rexBuilder);
++        for (RexNode node : constExps) {
++            programBuilder.addProject(
++                node, "c" + programBuilder.getProjectList().size());
++        }
++        final JavaTypeFactoryImpl javaTypeFactory =
++            new JavaTypeFactoryImpl(rexBuilder.getTypeFactory().getTypeSystem());
++        final BlockBuilder blockBuilder = new BlockBuilder();
++        final ParameterExpression root0_ =
++            Expressions.parameter(Object.class, "root0");
++        final ParameterExpression root_ = DataContext.ROOT;
++        blockBuilder.add(
++            Expressions.declare(
++                Modifier.FINAL, root_,
++                Expressions.convert_(root0_, DataContext.class)));
++        final SqlConformance conformance = SqlConformanceEnum.DEFAULT;
++        final RexProgram program = programBuilder.getProgram();
++        final List<Expression> expressions =
++            RexToLixTranslator.translateProjects(program, javaTypeFactory,
++                conformance, blockBuilder, null, root_, getter, null);
++        blockBuilder.add(
++            Expressions.return_(null,
++                Expressions.newArrayInit(Object[].class, expressions)));
++        final MethodDeclaration mtdDecl =
++            Expressions.methodDecl(Modifier.PUBLIC, Object[].class,
++                BuiltInMethod.FUNCTION1_APPLY.method.getName(),
++                ImmutableList.of(root0_), blockBuilder.toBlock());
++        String code = Expressions.toString(mtdDecl);
++        if (CalciteSystemProperty.DEBUG.value())
++            Util.debugCode(System.out, code);
++        return code;
++    }
 +
-   /**
-    * Implementation of
-    * {@link org.apache.calcite.adapter.enumerable.RexToLixTranslator.InputGetter}
-    * that reads the values of input fields by calling
-    * <code>{@link org.apache.calcite.DataContext#get}("inputRecord")</code>.
-    */
-   private static class DataContextInputGetter implements RexToLixTranslator.InputGetter {
++    /**
++     * Creates an {@link RexExecutable} that allows to apply the
++     * generated code during query processing (filter, projection).
++     *
++     * @param rexBuilder Rex builder
++     * @param exps Expressions
++     * @param rowType describes the structure of the input row.
++     */
++    public static RexExecutable getExecutable(RexBuilder rexBuilder, List<RexNode> exps, RelDataType rowType) {
++        final JavaTypeFactoryImpl typeFactory =
++            new JavaTypeFactoryImpl(rexBuilder.getTypeFactory().getTypeSystem());
++        final RexToLixTranslator.InputGetter getter = new DataContextInputGetter(rowType, typeFactory);
++        final String code = compile(rexBuilder, exps, getter, rowType);
++        return new RexExecutable(code, "generated Rex code");
++    }
 +
-     /** Type factory. */
-     private final RelDataTypeFactory typeFactory;
++    /**
++     * Do constant reduction using generated code.
++     */
++    @Override public void reduce(RexBuilder rexBuilder, List<RexNode> constExps, List<RexNode> reducedValues) {
++        for (RexNode node : constExps) {
++            // Do not simplify UUID types, since we can't convert it to literal of this type.
++            if (node.getType() instanceof UuidType) {
++                reducedValues.addAll(constExps);
++                return;
++            }
++        }
++        final String code = compile(rexBuilder, constExps,
++            (list, index, storageType) -> {
++                throw new UnsupportedOperationException();
++            });
 +
-     /** Row type. */
-     private final RelDataType rowType;
++        final RexExecutable executable = new RexExecutable(code, constExps);
++        executable.setDataContext(dataCtx);
++        executable.reduce(rexBuilder, constExps, reducedValues);
++    }
 +
 +    /**
-      * @param rowType Row type.
-      * @param typeFactory Type factory.
++     * Implementation of
++     * {@link org.apache.calcite.adapter.enumerable.RexToLixTranslator.InputGetter}
++     * that reads the values of input fields by calling
++     * <code>{@link org.apache.calcite.DataContext#get}("inputRecord")</code>.
 +     */
-     DataContextInputGetter(RelDataType rowType,
-         RelDataTypeFactory typeFactory) {
-       this.rowType = rowType;
-       this.typeFactory = typeFactory;
-     }
++    private static class DataContextInputGetter implements RexToLixTranslator.InputGetter {
++        /** Type factory. */
++        private final RelDataTypeFactory typeFactory;
++
++        /** Row type. */
++        private final RelDataType rowType;
++
++        /**
++         * @param rowType Row type.
++         * @param typeFactory Type factory.
++         */
++        DataContextInputGetter(RelDataType rowType, RelDataTypeFactory typeFactory) {
++            this.rowType = rowType;
++            this.typeFactory = typeFactory;
++        }
 +
-     /** {@inheritDoc} */
-     @Override public Expression field(BlockBuilder list, int idx, Type storageType) {
-       MethodCallExpression recFromCtx = Expressions.call(
-           DataContext.ROOT,
-           BuiltInMethod.DATA_CONTEXT_GET.method,
-           Expressions.constant("inputRecord"));
-       Expression recFromCtxCasted =
-           ConverterUtils.convert(recFromCtx, Object[].class);
-       IndexExpression recordAccess = Expressions.arrayIndex(recFromCtxCasted,
-           Expressions.constant(idx));
-       if (storageType == null) {
-         final RelDataType fieldType =
-             rowType.getFieldList().get(idx).getType();
-         storageType = ((JavaTypeFactory)typeFactory).getJavaClass(fieldType);
-       }
-       return ConverterUtils.convert(recordAccess, storageType);
++        /** {@inheritDoc} */
++        @Override public Expression field(BlockBuilder list, int idx, Type storageType) {
++            MethodCallExpression recFromCtx = Expressions.call(
++                DataContext.ROOT,
++                BuiltInMethod.DATA_CONTEXT_GET.method,
++                Expressions.constant("inputRecord"));
++            Expression recFromCtxCasted =
++                ConverterUtils.convert(recFromCtx, Object[].class);
++            IndexExpression recordAccess = Expressions.arrayIndex(recFromCtxCasted,
++                Expressions.constant(idx));
++            if (storageType == null) {
++                final RelDataType fieldType =
++                    rowType.getFieldList().get(idx).getType();
++                storageType = ((JavaTypeFactory)typeFactory).getJavaClass(fieldType);
++            }
++            return ConverterUtils.convert(recordAccess, storageType);
++        }
 +    }
-   }
 +}
diff --cc modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/exp/RexToLixTranslator.java
index 19f1b48,0000000..b5440fc
mode 100644,000000..100644
--- a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/exp/RexToLixTranslator.java
+++ b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/exp/RexToLixTranslator.java
@@@ -1,1374 -1,0 +1,1374 @@@
 +/*
 + * Licensed to the Apache Software Foundation (ASF) under one or more
 + * contributor license agreements.  See the NOTICE file distributed with
 + * this work for additional information regarding copyright ownership.
 + * The ASF licenses this file to you under the Apache License, Version 2.0
 + * (the "License"); you may not use this file except in compliance with
 + * the License.  You may obtain a copy of the License at
 + *
 + * http://www.apache.org/licenses/LICENSE-2.0
 + *
 + * Unless required by applicable law or agreed to in writing, software
 + * distributed under the License is distributed on an "AS IS" BASIS,
 + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 + * See the License for the specific language governing permissions and
 + * limitations under the License.
 + */
 +package org.apache.ignite.internal.processors.query.calcite.exec.exp;
 +
 +import java.lang.reflect.Method;
 +import java.lang.reflect.Modifier;
 +import java.lang.reflect.Type;
 +import java.math.BigDecimal;
 +import java.util.ArrayList;
 +import java.util.HashMap;
 +import java.util.List;
 +import java.util.Map;
 +import java.util.Objects;
 +import com.google.common.collect.ImmutableList;
 +import org.apache.calcite.adapter.enumerable.PhysType;
 +import org.apache.calcite.adapter.java.JavaTypeFactory;
 +import org.apache.calcite.avatica.util.ByteString;
 +import org.apache.calcite.avatica.util.DateTimeUtils;
 +import org.apache.calcite.linq4j.function.Function1;
 +import org.apache.calcite.linq4j.tree.BlockBuilder;
 +import org.apache.calcite.linq4j.tree.BlockStatement;
 +import org.apache.calcite.linq4j.tree.CatchBlock;
 +import org.apache.calcite.linq4j.tree.ConstantExpression;
 +import org.apache.calcite.linq4j.tree.Expression;
 +import org.apache.calcite.linq4j.tree.Expressions;
 +import org.apache.calcite.linq4j.tree.ParameterExpression;
 +import org.apache.calcite.linq4j.tree.Primitive;
 +import org.apache.calcite.linq4j.tree.Statement;
 +import org.apache.calcite.rel.type.RelDataType;
 +import org.apache.calcite.rex.RexBuilder;
 +import org.apache.calcite.rex.RexCall;
 +import org.apache.calcite.rex.RexCorrelVariable;
 +import org.apache.calcite.rex.RexDynamicParam;
 +import org.apache.calcite.rex.RexFieldAccess;
 +import org.apache.calcite.rex.RexInputRef;
 +import org.apache.calcite.rex.RexLiteral;
 +import org.apache.calcite.rex.RexLocalRef;
 +import org.apache.calcite.rex.RexNode;
 +import org.apache.calcite.rex.RexOver;
 +import org.apache.calcite.rex.RexPatternFieldRef;
 +import org.apache.calcite.rex.RexProgram;
 +import org.apache.calcite.rex.RexRangeRef;
 +import org.apache.calcite.rex.RexSubQuery;
 +import org.apache.calcite.rex.RexTableInputRef;
 +import org.apache.calcite.rex.RexUtil;
 +import org.apache.calcite.rex.RexVisitor;
 +import org.apache.calcite.runtime.GeoFunctions;
 +import org.apache.calcite.runtime.Geometries;
 +import org.apache.calcite.sql.SqlIntervalQualifier;
 +import org.apache.calcite.sql.SqlOperator;
 +import org.apache.calcite.sql.fun.SqlStdOperatorTable;
 +import org.apache.calcite.sql.parser.SqlParserPos;
 +import org.apache.calcite.sql.type.SqlTypeName;
 +import org.apache.calcite.sql.type.SqlTypeUtil;
 +import org.apache.calcite.sql.validate.SqlConformance;
 +import org.apache.calcite.util.BuiltInMethod;
 +import org.apache.calcite.util.ControlFlowException;
 +import org.apache.calcite.util.Pair;
 +import org.apache.ignite.internal.processors.query.calcite.util.IgniteMethod;
 +
 +import static org.apache.calcite.sql.fun.SqlStdOperatorTable.CASE;
 +import static org.apache.calcite.sql.fun.SqlStdOperatorTable.SEARCH;
 +
 +/**
 + * Translates {@link RexNode REX expressions} to {@link Expression linq4j expressions}.
 + */
 +public class RexToLixTranslator implements RexVisitor<RexToLixTranslator.Result> {
 +    /** */
 +    final JavaTypeFactory typeFactory;
 +
 +    /** */
 +    final RexBuilder builder;
 +
 +    /** */
 +    private final RexProgram program;
 +
 +    /** */
 +    final SqlConformance conformance;
 +
 +    /** */
 +    private final Expression root;
 +
 +    /** */
 +    final RexToLixTranslator.InputGetter inputGetter;
 +
 +    /** */
 +    private final BlockBuilder list;
 +
 +    /** */
 +    private final Function1<String, InputGetter> correlates;
 +
 +    /**
 +     * Map from RexLiteral's variable name to its literal, which is often a ({@link ConstantExpression})) It is used in
 +     * the some {@code RexCall}'s implementors, such as {@code ExtractImplementor}.
 +     *
 +     * @see #getLiteral
 +     * @see #getLiteralValue
 +     */
 +    private final Map<Expression, Expression> literalMap = new HashMap<>();
 +
 +    /**
 +     * For {@code RexCall}, keep the list of its operand's {@code Result}. It is useful when creating a {@code
 +     * CallImplementor}.
 +     */
 +    private final Map<RexCall, List<Result>> callOperandResultMap = new HashMap<>();
 +
 +    /**
 +     * Map from RexNode under specific storage type to its Result, to avoid generating duplicate code. For {@code
 +     * RexInputRef}, {@code RexDynamicParam} and {@code RexFieldAccess}.
 +     */
 +    private final Map<Pair<RexNode, Type>, Result> rexWithStorageTypeResultMap = new HashMap<>();
 +
 +    /**
 +     * Map from RexNode to its Result, to avoid generating duplicate code. For {@code RexLiteral} and {@code RexCall}.
 +     */
 +    private final Map<RexNode, Result> rexResultMap = new HashMap<>();
 +
 +    /** */
 +    private Type currentStorageType;
 +
 +    /** */
 +    private RexToLixTranslator(RexProgram program,
 +        JavaTypeFactory typeFactory,
 +        Expression root,
 +        InputGetter inputGetter,
 +        BlockBuilder list,
 +        RexBuilder builder,
 +        SqlConformance conformance,
 +        Function1<String, InputGetter> correlates) {
 +        this.program = program; // may be null
 +        this.typeFactory = Objects.requireNonNull(typeFactory);
 +        this.conformance = Objects.requireNonNull(conformance);
 +        this.root = Objects.requireNonNull(root);
 +        this.inputGetter = inputGetter;
 +        this.list = Objects.requireNonNull(list);
 +        this.builder = Objects.requireNonNull(builder);
 +        this.correlates = correlates; // may be null
 +    }
 +
 +    /**
 +     * Translates a {@link RexProgram} to a sequence of expressions and declarations.
 +     *
 +     * @param program Program to be translated
 +     * @param typeFactory Type factory
 +     * @param conformance SQL conformance
 +     * @param list List of statements, populated with declarations
 +     * @param outputPhysType Output type, or null
 +     * @param root Root expression
 +     * @param inputGetter Generates expressions for inputs
 +     * @param correlates Provider of references to the values of correlated variables
 +     * @return Sequence of expressions, optional condition
 +     */
 +    public static List<Expression> translateProjects(RexProgram program,
 +        JavaTypeFactory typeFactory, SqlConformance conformance,
 +        BlockBuilder list, PhysType outputPhysType, Expression root,
 +        InputGetter inputGetter, Function1<String, InputGetter> correlates) {
 +        List<Type> storageTypes = null;
 +        if (outputPhysType != null) {
 +            final RelDataType rowType = outputPhysType.getRowType();
 +            storageTypes = new ArrayList<>(rowType.getFieldCount());
 +            for (int i = 0; i < rowType.getFieldCount(); i++)
 +                storageTypes.add(outputPhysType.getJavaFieldType(i));
 +        }
 +        return new RexToLixTranslator(program, typeFactory, root, inputGetter,
 +            list, new RexBuilder(typeFactory), conformance, null)
 +            .setCorrelates(correlates)
 +            .translateList(program.getProjectList(), storageTypes);
 +    }
 +
 +    /** */
 +    Expression translate(RexNode expr) {
 +        final RexImpTable.NullAs nullAs =
 +            RexImpTable.NullAs.of(isNullable(expr));
 +        return translate(expr, nullAs);
 +    }
 +
 +    /** */
 +    Expression translate(RexNode expr, RexImpTable.NullAs nullAs) {
 +        return translate(expr, nullAs, null);
 +    }
 +
 +    /** */
 +    Expression translate(RexNode expr, Type storageType) {
 +        final RexImpTable.NullAs nullAs =
 +            RexImpTable.NullAs.of(isNullable(expr));
 +        return translate(expr, nullAs, storageType);
 +    }
 +
 +    /** */
 +    Expression translate(RexNode expr, RexImpTable.NullAs nullAs,
 +        Type storageType) {
 +        currentStorageType = storageType;
 +        final Result result = expr.accept(this);
 +        final Expression translated =
 +            ConverterUtils.toInternal(result.valueVariable, storageType);
 +        assert translated != null;
 +        // When we asked for not null input that would be stored as box, avoid unboxing
 +        if (RexImpTable.NullAs.NOT_POSSIBLE == nullAs
 +            && translated.type.equals(storageType))
 +            return translated;
 +
 +        return nullAs.handle(translated);
 +    }
 +
 +    /** */
 +    Expression translateCast(
 +        RelDataType sourceType,
 +        RelDataType targetType,
 +        Expression operand) {
 +        Expression convert = null;
 +        switch (targetType.getSqlTypeName()) {
 +            case ANY:
 +                convert = operand;
 +                break;
 +            case DATE:
 +                switch (sourceType.getSqlTypeName()) {
 +                    case CHAR:
 +                    case VARCHAR:
 +                        convert =
 +                            Expressions.call(BuiltInMethod.STRING_TO_DATE.method, operand);
 +                        break;
 +                    case TIMESTAMP:
 +                        convert = Expressions.convert_(
 +                            Expressions.call(BuiltInMethod.FLOOR_DIV.method,
 +                                operand, Expressions.constant(DateTimeUtils.MILLIS_PER_DAY)),
 +                            int.class);
 +                        break;
 +                    case TIMESTAMP_WITH_LOCAL_TIME_ZONE:
 +                        convert = RexImpTable.optimize2(
 +                            operand,
 +                            Expressions.call(
 +                                BuiltInMethod.TIMESTAMP_WITH_LOCAL_TIME_ZONE_TO_DATE.method,
 +                                operand,
 +                                Expressions.call(BuiltInMethod.TIME_ZONE.method, root)));
 +                }
 +                break;
 +            case TIME:
 +                switch (sourceType.getSqlTypeName()) {
 +                    case CHAR:
 +                    case VARCHAR:
 +                        convert =
 +                            Expressions.call(BuiltInMethod.STRING_TO_TIME.method, operand);
 +                        break;
 +                    case TIME_WITH_LOCAL_TIME_ZONE:
 +                        convert = RexImpTable.optimize2(
 +                            operand,
 +                            Expressions.call(
 +                                BuiltInMethod.TIME_WITH_LOCAL_TIME_ZONE_TO_TIME.method,
 +                                operand,
 +                                Expressions.call(BuiltInMethod.TIME_ZONE.method, root)));
 +                        break;
 +                    case TIMESTAMP:
 +                        convert = Expressions.convert_(
 +                            Expressions.call(
 +                                BuiltInMethod.FLOOR_MOD.method,
 +                                operand,
 +                                Expressions.constant(DateTimeUtils.MILLIS_PER_DAY)),
 +                            int.class);
 +                        break;
 +                    case TIMESTAMP_WITH_LOCAL_TIME_ZONE:
 +                        convert = RexImpTable.optimize2(
 +                            operand,
 +                            Expressions.call(
 +                                BuiltInMethod.TIMESTAMP_WITH_LOCAL_TIME_ZONE_TO_TIME.method,
 +                                operand,
 +                                Expressions.call(BuiltInMethod.TIME_ZONE.method, root)));
 +                }
 +                break;
 +            case TIME_WITH_LOCAL_TIME_ZONE:
 +                switch (sourceType.getSqlTypeName()) {
 +                    case CHAR:
 +                    case VARCHAR:
 +                        convert =
 +                            Expressions.call(BuiltInMethod.STRING_TO_TIME_WITH_LOCAL_TIME_ZONE.method, operand);
 +                        break;
 +                    case TIME:
 +                        convert = Expressions.call(
 +                            BuiltInMethod.TIME_STRING_TO_TIME_WITH_LOCAL_TIME_ZONE.method,
 +                            RexImpTable.optimize2(
 +                                operand,
 +                                Expressions.call(
 +                                    BuiltInMethod.UNIX_TIME_TO_STRING.method,
 +                                    operand)),
 +                            Expressions.call(BuiltInMethod.TIME_ZONE.method, root));
 +                        break;
 +                    case TIMESTAMP:
 +                        convert = Expressions.call(
 +                            BuiltInMethod.TIMESTAMP_STRING_TO_TIMESTAMP_WITH_LOCAL_TIME_ZONE.method,
 +                            RexImpTable.optimize2(
 +                                operand,
 +                                Expressions.call(
 +                                    BuiltInMethod.UNIX_TIMESTAMP_TO_STRING.method,
 +                                    operand)),
 +                            Expressions.call(BuiltInMethod.TIME_ZONE.method, root));
 +                        break;
 +                    case TIMESTAMP_WITH_LOCAL_TIME_ZONE:
 +                        convert = RexImpTable.optimize2(
 +                            operand,
 +                            Expressions.call(
 +                                BuiltInMethod.TIMESTAMP_WITH_LOCAL_TIME_ZONE_TO_TIME_WITH_LOCAL_TIME_ZONE.method,
 +                                operand));
 +                }
 +                break;
 +            case TIMESTAMP:
 +                switch (sourceType.getSqlTypeName()) {
 +                    case CHAR:
 +                    case VARCHAR:
 +                        convert =
 +                            Expressions.call(BuiltInMethod.STRING_TO_TIMESTAMP.method, operand);
 +                        break;
 +                    case DATE:
 +                        convert = Expressions.multiply(
 +                            Expressions.convert_(operand, long.class),
 +                            Expressions.constant(DateTimeUtils.MILLIS_PER_DAY));
 +                        break;
 +                    case TIME:
 +                        convert =
 +                            Expressions.add(
 +                                Expressions.multiply(
 +                                    Expressions.convert_(
 +                                        Expressions.call(BuiltInMethod.CURRENT_DATE.method, root),
 +                                        long.class),
 +                                    Expressions.constant(DateTimeUtils.MILLIS_PER_DAY)),
 +                                Expressions.convert_(operand, long.class));
 +                        break;
 +                    case TIME_WITH_LOCAL_TIME_ZONE:
 +                        convert = RexImpTable.optimize2(
 +                            operand,
 +                            Expressions.call(
 +                                BuiltInMethod.TIME_WITH_LOCAL_TIME_ZONE_TO_TIMESTAMP.method,
 +                                Expressions.call(
 +                                    BuiltInMethod.UNIX_DATE_TO_STRING.method,
 +                                    Expressions.call(BuiltInMethod.CURRENT_DATE.method, root)),
 +                                operand,
 +                                Expressions.call(BuiltInMethod.TIME_ZONE.method, root)));
 +                        break;
 +                    case TIMESTAMP_WITH_LOCAL_TIME_ZONE:
 +                        convert = RexImpTable.optimize2(
 +                            operand,
 +                            Expressions.call(
 +                                BuiltInMethod.TIMESTAMP_WITH_LOCAL_TIME_ZONE_TO_TIMESTAMP.method,
 +                                operand,
 +                                Expressions.call(BuiltInMethod.TIME_ZONE.method, root)));
 +                }
 +                break;
 +            case TIMESTAMP_WITH_LOCAL_TIME_ZONE:
 +                switch (sourceType.getSqlTypeName()) {
 +                    case CHAR:
 +                    case VARCHAR:
 +                        convert =
 +                            Expressions.call(
 +                                BuiltInMethod.STRING_TO_TIMESTAMP_WITH_LOCAL_TIME_ZONE.method,
 +                                operand);
 +                        break;
 +                    case DATE:
 +                        convert = Expressions.call(
 +                            BuiltInMethod.TIMESTAMP_STRING_TO_TIMESTAMP_WITH_LOCAL_TIME_ZONE.method,
 +                            RexImpTable.optimize2(
 +                                operand,
 +                                Expressions.call(
 +                                    BuiltInMethod.UNIX_TIMESTAMP_TO_STRING.method,
 +                                    Expressions.multiply(
 +                                        Expressions.convert_(operand, long.class),
 +                                        Expressions.constant(DateTimeUtils.MILLIS_PER_DAY)))),
 +                            Expressions.call(BuiltInMethod.TIME_ZONE.method, root));
 +                        break;
 +                    case TIME:
 +                        convert = Expressions.call(
 +                            BuiltInMethod.TIMESTAMP_STRING_TO_TIMESTAMP_WITH_LOCAL_TIME_ZONE.method,
 +                            RexImpTable.optimize2(
 +                                operand,
 +                                Expressions.call(
 +                                    BuiltInMethod.UNIX_TIMESTAMP_TO_STRING.method,
 +                                    Expressions.add(
 +                                        Expressions.multiply(
 +                                            Expressions.convert_(
 +                                                Expressions.call(BuiltInMethod.CURRENT_DATE.method, root),
 +                                                long.class),
 +                                            Expressions.constant(DateTimeUtils.MILLIS_PER_DAY)),
 +                                        Expressions.convert_(operand, long.class)))),
 +                            Expressions.call(BuiltInMethod.TIME_ZONE.method, root));
 +                        break;
 +                    case TIME_WITH_LOCAL_TIME_ZONE:
 +                        convert = RexImpTable.optimize2(
 +                            operand,
 +                            Expressions.call(
 +                                BuiltInMethod.TIME_WITH_LOCAL_TIME_ZONE_TO_TIMESTAMP_WITH_LOCAL_TIME_ZONE.method,
 +                                Expressions.call(
 +                                    BuiltInMethod.UNIX_DATE_TO_STRING.method,
 +                                    Expressions.call(BuiltInMethod.CURRENT_DATE.method, root)),
 +                                operand));
 +                        break;
 +                    case TIMESTAMP:
 +                        convert = Expressions.call(
 +                            BuiltInMethod.TIMESTAMP_STRING_TO_TIMESTAMP_WITH_LOCAL_TIME_ZONE.method,
 +                            RexImpTable.optimize2(
 +                                operand,
 +                                Expressions.call(
 +                                    BuiltInMethod.UNIX_TIMESTAMP_TO_STRING.method,
 +                                    operand)),
 +                            Expressions.call(BuiltInMethod.TIME_ZONE.method, root));
 +                }
 +                break;
 +            case BOOLEAN:
 +                switch (sourceType.getSqlTypeName()) {
 +                    case CHAR:
 +                    case VARCHAR:
 +                        convert = Expressions.call(
 +                            BuiltInMethod.STRING_TO_BOOLEAN.method,
 +                            operand);
 +                }
 +                break;
 +            case CHAR:
 +            case VARCHAR:
 +                final SqlIntervalQualifier interval =
 +                    sourceType.getIntervalQualifier();
 +                switch (sourceType.getSqlTypeName()) {
 +                    case DATE:
 +                        convert = RexImpTable.optimize2(
 +                            operand,
 +                            Expressions.call(
 +                                BuiltInMethod.UNIX_DATE_TO_STRING.method,
 +                                operand));
 +                        break;
 +                    case TIME:
 +                        convert = RexImpTable.optimize2(
 +                            operand,
 +                            Expressions.call(
 +                                BuiltInMethod.UNIX_TIME_TO_STRING.method,
 +                                operand));
 +                        break;
 +                    case TIME_WITH_LOCAL_TIME_ZONE:
 +                        convert = RexImpTable.optimize2(
 +                            operand,
 +                            Expressions.call(
 +                                BuiltInMethod.TIME_WITH_LOCAL_TIME_ZONE_TO_STRING.method,
 +                                operand,
 +                                Expressions.call(BuiltInMethod.TIME_ZONE.method, root)));
 +                        break;
 +                    case TIMESTAMP:
 +                        convert = RexImpTable.optimize2(
 +                            operand,
 +                            Expressions.call(
 +                                BuiltInMethod.UNIX_TIMESTAMP_TO_STRING.method,
 +                                operand));
 +                        break;
 +                    case TIMESTAMP_WITH_LOCAL_TIME_ZONE:
 +                        convert = RexImpTable.optimize2(
 +                            operand,
 +                            Expressions.call(
 +                                BuiltInMethod.TIMESTAMP_WITH_LOCAL_TIME_ZONE_TO_STRING.method,
 +                                operand,
 +                                Expressions.call(BuiltInMethod.TIME_ZONE.method, root)));
 +                        break;
 +                    case INTERVAL_YEAR:
 +                    case INTERVAL_YEAR_MONTH:
 +                    case INTERVAL_MONTH:
 +                        convert = RexImpTable.optimize2(
 +                            operand,
 +                            Expressions.call(
 +                                BuiltInMethod.INTERVAL_YEAR_MONTH_TO_STRING.method,
 +                                operand,
 +                                Expressions.constant(interval.timeUnitRange)));
 +                        break;
 +                    case INTERVAL_DAY:
 +                    case INTERVAL_DAY_HOUR:
 +                    case INTERVAL_DAY_MINUTE:
 +                    case INTERVAL_DAY_SECOND:
 +                    case INTERVAL_HOUR:
 +                    case INTERVAL_HOUR_MINUTE:
 +                    case INTERVAL_HOUR_SECOND:
 +                    case INTERVAL_MINUTE:
 +                    case INTERVAL_MINUTE_SECOND:
 +                    case INTERVAL_SECOND:
 +                        convert = RexImpTable.optimize2(
 +                            operand,
 +                            Expressions.call(
 +                                BuiltInMethod.INTERVAL_DAY_TIME_TO_STRING.method,
 +                                operand,
 +                                Expressions.constant(interval.timeUnitRange),
 +                                Expressions.constant(
 +                                    interval.getFractionalSecondPrecision(
 +                                        typeFactory.getTypeSystem()))));
 +                        break;
 +                    case BOOLEAN:
 +                        convert = RexImpTable.optimize2(
 +                            operand,
 +                            Expressions.call(
 +                                BuiltInMethod.BOOLEAN_TO_STRING.method,
 +                                operand));
 +                        break;
 +                    case BINARY:
 +                    case VARBINARY:
 +                        convert = RexImpTable.optimize2(
 +                            operand,
 +                            Expressions.call(IgniteMethod.BYTESTRING_TO_STRING.method(), operand));
 +                        break;
 +                }
 +                break;
 +            case INTERVAL_YEAR:
 +            case INTERVAL_YEAR_MONTH:
 +            case INTERVAL_MONTH:
 +            case INTERVAL_DAY:
 +            case INTERVAL_DAY_HOUR:
 +            case INTERVAL_DAY_MINUTE:
 +            case INTERVAL_DAY_SECOND:
 +            case INTERVAL_HOUR:
 +            case INTERVAL_HOUR_MINUTE:
 +            case INTERVAL_HOUR_SECOND:
 +            case INTERVAL_MINUTE:
 +            case INTERVAL_MINUTE_SECOND:
 +            case INTERVAL_SECOND:
 +                switch (sourceType.getSqlTypeName().getFamily()) {
 +                    case CHARACTER:
 +                        SqlIntervalQualifier intervalQualifier = targetType.getIntervalQualifier();
 +
 +                        Method method = intervalQualifier.isYearMonth() ?
 +                            IgniteMethod.PARSE_INTERVAL_YEAR_MONTH.method() :
 +                            IgniteMethod.PARSE_INTERVAL_DAY_TIME.method();
 +
 +                        convert = Expressions.call(
 +                            method,
 +                            operand,
 +                            Expressions.new_(SqlIntervalQualifier.class,
 +                                Expressions.constant(intervalQualifier.getStartUnit()),
 +                                Expressions.constant(intervalQualifier.getStartPrecisionPreservingDefault()),
 +                                Expressions.constant(intervalQualifier.getEndUnit()),
 +                                Expressions.constant(intervalQualifier.getFractionalSecondPrecisionPreservingDefault()),
 +                                Expressions.field(null, SqlParserPos.class, "ZERO")
 +                            )
 +                        );
 +                }
 +                break;
 +            case BINARY:
 +            case VARBINARY:
 +                switch (sourceType.getSqlTypeName().getFamily()) {
 +                    case CHARACTER:
 +                        convert = Expressions.call(IgniteMethod.STRING_TO_BYTESTRING.method(), operand);
 +                }
 +                break;
 +        }
 +        if (targetType.getSqlTypeName() == SqlTypeName.DECIMAL)
 +            convert = ConverterUtils.convertToDecimal(operand, targetType);
 +
 +        if (convert == null)
 +            convert = ConverterUtils.convert(operand, typeFactory.getJavaClass(targetType));
 +
 +        // Going from anything to CHAR(n) or VARCHAR(n), make sure value is no
 +        // longer than n.
 +        boolean pad = false;
 +        boolean truncate = true;
 +        switch (targetType.getSqlTypeName()) {
 +            case CHAR:
 +            case BINARY:
 +                pad = true;
 +                // fall through
 +            case VARCHAR:
 +            case VARBINARY:
 +                final int targetPrecision = targetType.getPrecision();
 +                if (targetPrecision >= 0) {
 +                    switch (sourceType.getSqlTypeName()) {
 +                        case CHAR:
 +                        case VARCHAR:
 +                        case BINARY:
 +                        case VARBINARY:
 +                            // If this is a widening cast, no need to truncate.
 +                            final int sourcePrecision = sourceType.getPrecision();
 +                            if (SqlTypeUtil.comparePrecision(sourcePrecision, targetPrecision) <= 0)
 +                                truncate = false;
 +
 +                            // If this is a widening cast, no need to pad.
 +                            if (SqlTypeUtil.comparePrecision(sourcePrecision, targetPrecision) >= 0)
 +                                pad = false;
 +
 +                            // fall through
 +                        default:
 +                            if (truncate || pad) {
 +                                convert =
 +                                    Expressions.call(
 +                                        pad
 +                                            ? BuiltInMethod.TRUNCATE_OR_PAD.method
 +                                            : BuiltInMethod.TRUNCATE.method,
 +                                        convert,
 +                                        Expressions.constant(targetPrecision));
 +                            }
 +                    }
 +                }
 +                break;
 +            case TIMESTAMP:
 +                int targetScale = targetType.getScale();
 +                if (targetScale == RelDataType.SCALE_NOT_SPECIFIED)
 +                    targetScale = 0;
 +
 +                if (targetScale < sourceType.getScale()) {
 +                    convert =
 +                        Expressions.call(
 +                            BuiltInMethod.ROUND_LONG.method,
 +                            convert,
 +                            Expressions.constant(
 +                                (long)Math.pow(10, 3 - targetScale)));
 +                }
 +                break;
 +            case INTERVAL_YEAR:
 +            case INTERVAL_YEAR_MONTH:
 +            case INTERVAL_MONTH:
 +            case INTERVAL_DAY:
 +            case INTERVAL_DAY_HOUR:
 +            case INTERVAL_DAY_MINUTE:
 +            case INTERVAL_DAY_SECOND:
 +            case INTERVAL_HOUR:
 +            case INTERVAL_HOUR_MINUTE:
 +            case INTERVAL_HOUR_SECOND:
 +            case INTERVAL_MINUTE:
 +            case INTERVAL_MINUTE_SECOND:
 +            case INTERVAL_SECOND:
 +                switch (sourceType.getSqlTypeName().getFamily()) {
 +                    case NUMERIC:
 +                        final BigDecimal multiplier = targetType.getSqlTypeName().getEndUnit().multiplier;
 +                        final BigDecimal divider = BigDecimal.ONE;
 +                        convert = RexImpTable.multiplyDivide(convert, multiplier, divider);
 +                }
 +        }
 +        return scaleIntervalToNumber(sourceType, targetType, convert);
 +    }
 +
 +    /**
 +     * Dereferences an expression if it is a {@link RexLocalRef}.
 +     */
 +    public RexNode deref(RexNode expr) {
 +        if (expr instanceof RexLocalRef) {
 +            RexLocalRef ref = (RexLocalRef)expr;
 +            final RexNode e2 = program.getExprList().get(ref.getIndex());
 +            assert ref.getType().equals(e2.getType());
 +            return e2;
 +        }
 +
 +        return expr;
 +    }
 +
 +    /**
 +     * Translates a literal.
 +     *
 +     * @throws ControlFlowException if literal is null but {@code nullAs} is {@link RexImpTable.NullAs#NOT_POSSIBLE}.
 +     */
 +    public static Expression translateLiteral(
 +        RexLiteral literal,
 +        RelDataType type,
 +        JavaTypeFactory typeFactory,
 +        RexImpTable.NullAs nullAs) {
 +        if (literal.isNull()) {
 +            switch (nullAs) {
 +                case TRUE:
 +                case IS_NULL:
 +                    return RexImpTable.TRUE_EXPR;
 +                case FALSE:
 +                case IS_NOT_NULL:
 +                    return RexImpTable.FALSE_EXPR;
 +                case NOT_POSSIBLE:
 +                    throw new ControlFlowException();
 +                case NULL:
 +                default:
 +                    return RexImpTable.NULL_EXPR;
 +            }
 +        }
 +        else {
 +            switch (nullAs) {
 +                case IS_NOT_NULL:
 +                    return RexImpTable.TRUE_EXPR;
 +                case IS_NULL:
 +                    return RexImpTable.FALSE_EXPR;
 +            }
 +        }
 +        Type javaClass = typeFactory.getJavaClass(type);
 +        final Object value2;
 +        switch (literal.getType().getSqlTypeName()) {
 +            case DECIMAL:
 +                final BigDecimal bd = literal.getValueAs(BigDecimal.class);
 +                if (javaClass == float.class)
 +                    return Expressions.constant(bd, javaClass);
 +                else if (javaClass == double.class)
 +                    return Expressions.constant(bd, javaClass);
 +                assert javaClass == BigDecimal.class;
 +                return Expressions.call(
 +                    IgniteSqlFunctions.class,
 +                    "toBigDecimal",
 +                    /*
 +                    The ConstantExpression class, when converting from BigDecimal to Bigdecimal,
 +                    removes trailing zeros from the original object, regardless of the original scale value.
 +                    Therefore, BigDecimal must be converted to a string to avoid this.
 +                     */
 +                    Expressions.constant(bd.toString()),
 +                    Expressions.constant(type.getPrecision()),
 +                    Expressions.constant(type.getScale()));
 +            case DATE:
 +            case TIME:
 +            case TIME_WITH_LOCAL_TIME_ZONE:
 +            case INTERVAL_YEAR:
 +            case INTERVAL_YEAR_MONTH:
 +            case INTERVAL_MONTH:
 +                value2 = literal.getValueAs(Integer.class);
 +                javaClass = int.class;
 +                break;
 +            case TIMESTAMP:
 +            case TIMESTAMP_WITH_LOCAL_TIME_ZONE:
 +            case INTERVAL_DAY:
 +            case INTERVAL_DAY_HOUR:
 +            case INTERVAL_DAY_MINUTE:
 +            case INTERVAL_DAY_SECOND:
 +            case INTERVAL_HOUR:
 +            case INTERVAL_HOUR_MINUTE:
 +            case INTERVAL_HOUR_SECOND:
 +            case INTERVAL_MINUTE:
 +            case INTERVAL_MINUTE_SECOND:
 +            case INTERVAL_SECOND:
 +                value2 = literal.getValueAs(Long.class);
 +                javaClass = long.class;
 +                break;
 +            case CHAR:
 +            case VARCHAR:
 +                value2 = literal.getValueAs(String.class);
 +                break;
 +            case BINARY:
 +            case VARBINARY:
 +                return Expressions.new_(
 +                    ByteString.class,
 +                    Expressions.constant(
 +                        literal.getValueAs(byte[].class),
 +                        byte[].class));
 +            case GEOMETRY:
 +                final Geometries.Geom geom = literal.getValueAs(Geometries.Geom.class);
 +                final String wkt = GeoFunctions.ST_AsWKT(geom);
 +                return Expressions.call(null, BuiltInMethod.ST_GEOM_FROM_TEXT.method,
 +                    Expressions.constant(wkt));
 +            case SYMBOL:
 +                value2 = literal.getValueAs(Enum.class);
 +                javaClass = value2.getClass();
 +                break;
 +            default:
 +                final Primitive primitive = Primitive.ofBoxOr(javaClass);
 +                final Comparable value = literal.getValueAs(Comparable.class);
 +
 +                value2 = primitive != null && value instanceof Number ? primitive.number((Number)value) : value;
 +        }
 +        return Expressions.constant(value2, javaClass);
 +    }
 +
 +    /** */
 +    public List<Expression> translateList(
 +        List<RexNode> operandList,
 +        RexImpTable.NullAs nullAs) {
 +        return translateList(operandList, nullAs,
 +            ConverterUtils.internalTypes(operandList));
 +    }
 +
 +    /** */
 +    public List<Expression> translateList(
 +        List<RexNode> operandList,
 +        RexImpTable.NullAs nullAs,
 +        List<? extends Type> storageTypes) {
 +        final List<Expression> list = new ArrayList<>();
 +        for (Pair<RexNode, ? extends Type> e : Pair.zip(operandList, storageTypes))
 +            list.add(translate(e.left, nullAs, e.right));
 +
 +        return list;
 +    }
 +
 +    /**
 +     * Translates the list of {@code RexNode}, using the default output types. This might be suboptimal in terms of
 +     * additional box-unbox when you use the translation later. If you know the java class that will be used to store
 +     * the results, use {@link RexToLixTranslator#translateList(List, List)} version.
 +     *
 +     * @param operandList list of RexNodes to translate
 +     * @return translated expressions
 +     */
 +    public List<Expression> translateList(List<? extends RexNode> operandList) {
 +        return translateList(operandList, ConverterUtils.internalTypes(operandList));
 +    }
 +
 +    /**
 +     * Translates the list of {@code RexNode}, while optimizing for output storage. For instance, if the result of
 +     * translation is going to be stored in {@code Object[]}, and the input is {@code Object[]} as well, then translator
 +     * will avoid casting, boxing, etc.
 +     *
 +     * @param operandList list of RexNodes to translate
 +     * @param storageTypes hints of the java classes that will be used to store translation results. Use null to use
 +     * default storage type
 +     * @return translated expressions
 +     */
 +    public List<Expression> translateList(List<? extends RexNode> operandList,
 +        List<? extends Type> storageTypes) {
 +        final List<Expression> list = new ArrayList<>(operandList.size());
 +
 +        for (int i = 0; i < operandList.size(); i++) {
 +            RexNode rex = operandList.get(i);
 +            Type desiredType = null;
 +            if (storageTypes != null)
 +                desiredType = storageTypes.get(i);
 +
 +            final Expression translate = translate(rex, desiredType);
 +            list.add(translate);
 +            // desiredType is still a hint, thus we might get any kind of output
 +            // (boxed or not) when hint was provided.
 +            // It is favourable to get the type matching desired type
 +            if (desiredType == null && !isNullable(rex)) {
 +                assert !Primitive.isBox(translate.getType())
 +                    : "Not-null boxed primitive should come back as primitive: "
 +                    + rex + ", " + translate.getType();
 +            }
 +        }
 +        return list;
 +    }
 +
 +    /**
 +     * Returns whether an expression is nullable.
 +     *
 +     * @param e Expression
 +     * @return Whether expression is nullable
 +     */
 +    public boolean isNullable(RexNode e) {
 +        return e.getType().isNullable();
 +    }
 +
 +    /** */
 +    public RexToLixTranslator setBlock(BlockBuilder block) {
 +        if (block == list)
 +            return this;
 +
 +        return new RexToLixTranslator(program, typeFactory, root, inputGetter,
 +            block, builder, conformance, correlates);
 +    }
 +
 +    /** */
 +    public RexToLixTranslator setCorrelates(
 +        Function1<String, InputGetter> correlates) {
 +        if (this.correlates == correlates)
 +            return this;
 +
 +        return new RexToLixTranslator(program, typeFactory, root, inputGetter, list,
 +            builder, conformance, correlates);
 +    }
 +
 +    /** */
 +    public Expression getRoot() {
 +        return root;
 +    }
 +
 +    /** */
 +    private static Expression scaleIntervalToNumber(
 +        RelDataType sourceType,
 +        RelDataType targetType,
 +        Expression operand) {
 +        switch (targetType.getSqlTypeName().getFamily()) {
 +            case NUMERIC:
 +                switch (sourceType.getSqlTypeName()) {
 +                    case INTERVAL_YEAR:
 +                    case INTERVAL_YEAR_MONTH:
 +                    case INTERVAL_MONTH:
 +                    case INTERVAL_DAY:
 +                    case INTERVAL_DAY_HOUR:
 +                    case INTERVAL_DAY_MINUTE:
 +                    case INTERVAL_DAY_SECOND:
 +                    case INTERVAL_HOUR:
 +                    case INTERVAL_HOUR_MINUTE:
 +                    case INTERVAL_HOUR_SECOND:
 +                    case INTERVAL_MINUTE:
 +                    case INTERVAL_MINUTE_SECOND:
 +                    case INTERVAL_SECOND:
 +                        // Scale to the given field.
 +                        final BigDecimal multiplier = BigDecimal.ONE;
 +                        final BigDecimal divider =
 +                            sourceType.getSqlTypeName().getEndUnit().multiplier;
 +                        return RexImpTable.multiplyDivide(operand, multiplier, divider);
 +                }
 +        }
 +        return operand;
 +    }
 +
 +    /**
 +     * Visit {@code RexInputRef}. If it has never been visited under current storage type before, {@code
 +     * RexToLixTranslator} generally produces three lines of code. For example, when visiting a column (named
 +     * commission) in table Employee, the generated code snippet is: {@code final Employee current =(Employee)
 +     * inputEnumerator.current(); final Integer input_value = current.commission; final boolean input_isNull =
 +     * input_value == null; }
 +     */
 +    @Override public Result visitInputRef(RexInputRef inputRef) {
 +        final Pair<RexNode, Type> key = Pair.of(inputRef, currentStorageType);
 +        // If the RexInputRef has been visited under current storage type already,
 +        // it is not necessary to visit it again, just return the result.
 +        if (rexWithStorageTypeResultMap.containsKey(key))
 +            return rexWithStorageTypeResultMap.get(key);
 +
 +        // Generate one line of code to get the input, e.g.,
 +        // "final Employee current =(Employee) inputEnumerator.current();"
 +        final Expression valueExpression = inputGetter.field(
 +            list, inputRef.getIndex(), currentStorageType);
 +
 +        // Generate one line of code for the value of RexInputRef, e.g.,
 +        // "final Integer input_value = current.commission;"
 +        final ParameterExpression valueVariable =
 +            Expressions.parameter(
 +                valueExpression.getType(), list.newName("input_value"));
 +        list.add(Expressions.declare(Modifier.FINAL, valueVariable, valueExpression));
 +
 +        // Generate one line of code to check whether RexInputRef is null, e.g.,
 +        // "final boolean input_isNull = input_value == null;"
 +        final Expression isNullExpression = checkNull(valueVariable);
 +        final ParameterExpression isNullVariable =
 +            Expressions.parameter(
 +                Boolean.TYPE, list.newName("input_isNull"));
 +        list.add(Expressions.declare(Modifier.FINAL, isNullVariable, isNullExpression));
 +
 +        final Result result = new Result(isNullVariable, valueVariable);
 +
 +        // Cache <RexInputRef, currentStorageType>'s result
 +        rexWithStorageTypeResultMap.put(key, result);
 +
 +        return new Result(isNullVariable, valueVariable);
 +    }
 +
 +    /** {@inheritDoc} */
 +    @Override public Result visitLocalRef(RexLocalRef localRef) {
 +        return deref(localRef).accept(this);
 +    }
 +
 +    /**
 +     * Visit {@code RexLiteral}. If it has never been visited before, {@code RexToLixTranslator} will generate two lines
 +     * of code. For example, when visiting a primitive int (10), the generated code snippet is: {@code final int
 +     * literal_value = 10; final boolean literal_isNull = false; }
 +     */
 +    @Override public Result visitLiteral(RexLiteral literal) {
 +        // If the RexLiteral has been visited already, just return the result
 +        if (rexResultMap.containsKey(literal))
 +            return rexResultMap.get(literal);
 +
 +        // Generate one line of code for the value of RexLiteral, e.g.,
 +        // "final int literal_value = 10;"
 +        final Expression valueExpression = literal.isNull()
 +            // Note: even for null literal, we can't loss its type information
 +            ? getTypedNullLiteral(literal)
 +            : translateLiteral(literal, literal.getType(),
 +            typeFactory, RexImpTable.NullAs.NOT_POSSIBLE);
 +        final ParameterExpression valueVariable =
 +            Expressions.parameter(valueExpression.getType(),
 +                list.newName("literal_value"));
 +        list.add(Expressions.declare(Modifier.FINAL, valueVariable, valueExpression));
 +
 +        // Generate one line of code to check whether RexLiteral is null, e.g.,
 +        // "final boolean literal_isNull = false;"
 +        final Expression isNullExpression =
 +            literal.isNull() ? RexImpTable.TRUE_EXPR : RexImpTable.FALSE_EXPR;
 +        final ParameterExpression isNullVariable = Expressions.parameter(
 +            Boolean.TYPE, list.newName("literal_isNull"));
 +        list.add(Expressions.declare(Modifier.FINAL, isNullVariable, isNullExpression));
 +
 +        // Maintain the map from valueVariable (ParameterExpression) to real Expression
 +        literalMap.put(valueVariable, valueExpression);
 +        final Result result = new Result(isNullVariable, valueVariable);
 +        // Cache RexLiteral's result
 +        rexResultMap.put(literal, result);
 +        return result;
 +    }
 +
 +    /**
 +     * Returns an {@code Expression} for null literal without losing its type information.
 +     */
 +    private ConstantExpression getTypedNullLiteral(RexLiteral literal) {
 +        assert literal.isNull();
 +        Type javaClass = typeFactory.getJavaClass(literal.getType());
 +        switch (literal.getType().getSqlTypeName()) {
 +            case DATE:
 +            case TIME:
 +            case TIME_WITH_LOCAL_TIME_ZONE:
 +            case INTERVAL_YEAR:
 +            case INTERVAL_YEAR_MONTH:
 +            case INTERVAL_MONTH:
 +                javaClass = Integer.class;
 +                break;
 +            case TIMESTAMP:
 +            case TIMESTAMP_WITH_LOCAL_TIME_ZONE:
 +            case INTERVAL_DAY:
 +            case INTERVAL_DAY_HOUR:
 +            case INTERVAL_DAY_MINUTE:
 +            case INTERVAL_DAY_SECOND:
 +            case INTERVAL_HOUR:
 +            case INTERVAL_HOUR_MINUTE:
 +            case INTERVAL_HOUR_SECOND:
 +            case INTERVAL_MINUTE:
 +            case INTERVAL_MINUTE_SECOND:
 +            case INTERVAL_SECOND:
 +                javaClass = Long.class;
 +                break;
 +        }
 +        return javaClass == null || javaClass == Void.class
 +            ? RexImpTable.NULL_EXPR
 +            : Expressions.constant(null, javaClass);
 +    }
 +
 +    /**
 +     * Visit {@code RexCall}. For most {@code SqlOperator}s, we can get the implementor from {@code RexImpTable}.
 +     * Several operators (e.g., CaseWhen) with special semantics need to be implemented separately.
 +     */
 +    @Override public Result visitCall(RexCall call) {
 +        if (rexResultMap.containsKey(call))
 +            return rexResultMap.get(call);
 +
 +        final SqlOperator operator = call.getOperator();
 +        if (operator == CASE)
 +            return implementCaseWhen(call);
 +
 +        if (operator == SEARCH)
 +            return RexUtil.expandSearch(builder, program, call).accept(this);
 +
 +        final RexImpTable.RexCallImplementor implementor =
 +            RexImpTable.INSTANCE.get(operator);
 +        if (implementor == null)
 +            throw new RuntimeException("cannot translate call " + call);
 +
 +        final List<RexNode> operandList = call.getOperands();
 +        final List<Type> storageTypes = ConverterUtils.internalTypes(operandList);
 +        final List<Result> operandResults = new ArrayList<>();
 +        for (int i = 0; i < operandList.size(); i++) {
 +            final Result operandResult =
 +                implementCallOperand(operandList.get(i), storageTypes.get(i), this);
 +            operandResults.add(operandResult);
 +        }
 +        callOperandResultMap.put(call, operandResults);
 +        final Result result = implementor.implement(this, call, operandResults);
 +        rexResultMap.put(call, result);
 +        return result;
 +    }
 +
 +    /** */
 +    private static Result implementCallOperand(final RexNode operand,
 +        final Type storageType, final RexToLixTranslator translator) {
 +        final Type originalStorageType = translator.currentStorageType;
 +        translator.currentStorageType = storageType;
 +        Result operandResult = operand.accept(translator);
 +        if (storageType != null)
-           operandResult = translator.toInnerStorageType(operandResult, storageType);
++            operandResult = translator.toInnerStorageType(operandResult, storageType);
 +        translator.currentStorageType = originalStorageType;
 +        return operandResult;
 +    }
 +
 +    /** */
 +    private static Expression implementCallOperand2(final RexNode operand,
 +        final Type storageType, final RexToLixTranslator translator) {
 +        final Type originalStorageType = translator.currentStorageType;
 +        translator.currentStorageType = storageType;
 +        final Expression result = translator.translate(operand);
 +        translator.currentStorageType = originalStorageType;
 +        return result;
 +    }
 +
 +    /**
 +     * The CASE operator is SQL’s way of handling if/then logic. Different with other {@code RexCall}s, it is not safe
 +     * to implement its operands first. For example: {@code select case when s=0 then false else 100/s > 0 end from
 +     * (values (1),(0)) ax(s); }
 +     */
 +    private Result implementCaseWhen(RexCall call) {
 +        final Type returnType = typeFactory.getJavaClass(call.getType());
 +        final ParameterExpression valueVariable =
 +            Expressions.parameter(returnType,
 +                list.newName("case_when_value"));
 +        list.add(Expressions.declare(0, valueVariable, null));
 +        final List<RexNode> operandList = call.getOperands();
 +        implementRecursively(this, operandList, valueVariable, 0);
 +        final Expression isNullExpression = checkNull(valueVariable);
 +        final ParameterExpression isNullVariable =
 +            Expressions.parameter(
 +                Boolean.TYPE, list.newName("case_when_isNull"));
 +        list.add(Expressions.declare(Modifier.FINAL, isNullVariable, isNullExpression));
 +        final Result result = new Result(isNullVariable, valueVariable);
 +        rexResultMap.put(call, result);
 +        return result;
 +    }
 +
 +    /**
 +     * Case statements of the form: {@code CASE WHEN a THEN b [WHEN c THEN d]* [ELSE e] END}. When {@code a = true},
 +     * returns {@code b}; when {@code c = true}, returns {@code d}; else returns {@code e}.
 +     *
 +     * <p>We generate code that looks like:
 +     *
 +     * <blockquote><pre>
 +     *      int case_when_value;
 +     *      ......code for a......
 +     *      if (!a_isNull && a_value) {
 +     *          ......code for b......
 +     *          case_when_value = res(b_isNull, b_value);
 +     *      } else {
 +     *          ......code for c......
 +     *          if (!c_isNull && c_value) {
 +     *              ......code for d......
 +     *              case_when_value = res(d_isNull, d_value);
 +     *          } else {
 +     *              ......code for e......
 +     *              case_when_value = res(e_isNull, e_value);
 +     *          }
 +     *      }
 +     * </pre></blockquote>
 +     */
 +    private void implementRecursively(final RexToLixTranslator currentTranslator,
 +        final List<RexNode> operandList, final ParameterExpression valueVariable, int pos) {
 +        final BlockBuilder currentBlockBuilder = currentTranslator.getBlockBuilder();
 +        final List<Type> storageTypes = ConverterUtils.internalTypes(operandList);
 +        // [ELSE] clause
 +        if (pos == operandList.size() - 1) {
 +            Expression res = implementCallOperand2(operandList.get(pos),
 +                storageTypes.get(pos), currentTranslator);
 +            currentBlockBuilder.add(
 +                Expressions.statement(
 +                    Expressions.assign(valueVariable,
 +                        ConverterUtils.convert(res, valueVariable.getType()))));
 +            return;
 +        }
 +        // Condition code: !a_isNull && a_value
 +        final RexNode testerNode = operandList.get(pos);
 +        final Result testerResult = implementCallOperand(testerNode,
 +            storageTypes.get(pos), currentTranslator);
 +        final Expression tester = Expressions.andAlso(
 +            Expressions.not(testerResult.isNullVariable),
 +            testerResult.valueVariable);
 +        // Code for {if} branch
 +        final RexNode ifTrueNode = operandList.get(pos + 1);
 +        final BlockBuilder ifTrueBlockBuilder =
 +            new BlockBuilder(true, currentBlockBuilder);
 +        final RexToLixTranslator ifTrueTranslator =
 +            currentTranslator.setBlock(ifTrueBlockBuilder);
 +        final Expression ifTrueRes = implementCallOperand2(ifTrueNode,
 +            storageTypes.get(pos + 1), ifTrueTranslator);
 +        // Assign the value: case_when_value = ifTrueRes
 +        ifTrueBlockBuilder.add(
 +            Expressions.statement(
 +                Expressions.assign(valueVariable,
 +                    ConverterUtils.convert(ifTrueRes, valueVariable.getType()))));
 +        final BlockStatement ifTrue = ifTrueBlockBuilder.toBlock();
 +        // There is no [ELSE] clause
 +        if (pos + 1 == operandList.size() - 1) {
 +            currentBlockBuilder.add(
 +                Expressions.ifThen(tester, ifTrue));
 +            return;
 +        }
 +        // Generate code for {else} branch recursively
 +        final BlockBuilder ifFalseBlockBuilder =
 +            new BlockBuilder(true, currentBlockBuilder);
 +        final RexToLixTranslator ifFalseTranslator =
 +            currentTranslator.setBlock(ifFalseBlockBuilder);
 +        implementRecursively(ifFalseTranslator, operandList, valueVariable, pos + 2);
 +        final BlockStatement ifFalse = ifFalseBlockBuilder.toBlock();
 +        currentBlockBuilder.add(
 +            Expressions.ifThenElse(tester, ifTrue, ifFalse));
 +    }
 +
 +    /** */
 +    private Result toInnerStorageType(final Result result, final Type storageType) {
 +        final Expression valueExpression =
 +            ConverterUtils.toInternal(result.valueVariable, storageType);
 +        if (valueExpression.equals(result.valueVariable))
 +            return result;
 +
 +        final ParameterExpression valueVariable =
 +            Expressions.parameter(
 +                valueExpression.getType(),
 +                list.newName(result.valueVariable.name + "_inner_type"));
 +        list.add(Expressions.declare(Modifier.FINAL, valueVariable, valueExpression));
 +        final ParameterExpression isNullVariable = result.isNullVariable;
 +        return new Result(isNullVariable, valueVariable);
 +    }
 +
 +    /** {@inheritDoc} */
 +    @Override public Result visitDynamicParam(RexDynamicParam dynamicParam) {
 +        final Pair<RexNode, Type> key = Pair.of(dynamicParam, currentStorageType);
 +        if (rexWithStorageTypeResultMap.containsKey(key))
 +            return rexWithStorageTypeResultMap.get(key);
 +
 +        final Type storageType = currentStorageType != null
 +            ? currentStorageType : typeFactory.getJavaClass(dynamicParam.getType());
 +        final Expression valueExpression = ConverterUtils.convert(
 +            Expressions.call(root, BuiltInMethod.DATA_CONTEXT_GET.method,
 +                Expressions.constant("?" + dynamicParam.getIndex())),
 +            storageType);
 +        final ParameterExpression valueVariable =
 +            Expressions.parameter(valueExpression.getType(), list.newName("value_dynamic_param"));
 +        list.add(Expressions.declare(Modifier.FINAL, valueVariable, valueExpression));
 +        final ParameterExpression isNullVariable =
 +            Expressions.parameter(Boolean.TYPE, list.newName("isNull_dynamic_param"));
 +        list.add(Expressions.declare(Modifier.FINAL, isNullVariable, checkNull(valueVariable)));
 +        final Result result = new Result(isNullVariable, valueVariable);
 +        rexWithStorageTypeResultMap.put(key, result);
 +        return result;
 +    }
 +
 +    /** {@inheritDoc} */
 +    @Override public Result visitFieldAccess(RexFieldAccess fieldAccess) {
 +        final Pair<RexNode, Type> key = Pair.of(fieldAccess, currentStorageType);
 +        if (rexWithStorageTypeResultMap.containsKey(key))
 +            return rexWithStorageTypeResultMap.get(key);
 +
 +        final RexNode target = deref(fieldAccess.getReferenceExpr());
 +        int fieldIndex = fieldAccess.getField().getIndex();
 +        String fieldName = fieldAccess.getField().getName();
 +        switch (target.getKind()) {
 +            case CORREL_VARIABLE:
 +                if (correlates == null) {
 +                    throw new RuntimeException("Cannot translate " + fieldAccess
 +                        + " since correlate variables resolver is not defined");
 +                }
 +                final RexToLixTranslator.InputGetter getter =
 +                    correlates.apply(((RexCorrelVariable)target).getName());
 +                final Expression input = getter.field(
 +                    list, fieldIndex, currentStorageType);
 +                final Expression condition = checkNull(input);
 +                final ParameterExpression valueVariable =
 +                    Expressions.parameter(input.getType(), list.newName("corInp_value"));
 +                list.add(Expressions.declare(Modifier.FINAL, valueVariable, input));
 +                final ParameterExpression isNullVariable =
 +                    Expressions.parameter(Boolean.TYPE, list.newName("corInp_isNull"));
 +                final Expression isNullExpression = Expressions.condition(
 +                    condition,
 +                    RexImpTable.TRUE_EXPR,
 +                    checkNull(valueVariable));
 +                list.add(Expressions.declare(Modifier.FINAL, isNullVariable, isNullExpression));
 +                final Result result1 = new Result(isNullVariable, valueVariable);
 +                rexWithStorageTypeResultMap.put(key, result1);
 +                return result1;
 +            default:
 +                RexNode rxIndex =
 +                    builder.makeLiteral(fieldIndex, typeFactory.createType(int.class), true);
 +                RexNode rxName =
 +                    builder.makeLiteral(fieldName, typeFactory.createType(String.class), true);
 +                RexCall accessCall = (RexCall)builder.makeCall(
 +                    fieldAccess.getType(), SqlStdOperatorTable.STRUCT_ACCESS,
 +                    ImmutableList.of(target, rxIndex, rxName));
 +                final Result result2 = accessCall.accept(this);
 +                rexWithStorageTypeResultMap.put(key, result2);
 +                return result2;
 +        }
 +    }
 +
 +    /** {@inheritDoc} */
 +    @Override public Result visitOver(RexOver over) {
 +        throw new RuntimeException("cannot translate expression " + over);
 +    }
 +
 +    /** {@inheritDoc} */
 +    @Override public Result visitCorrelVariable(RexCorrelVariable correlVariable) {
 +        throw new RuntimeException("Cannot translate " + correlVariable
 +            + ". Correlated variables should always be referenced by field access");
 +    }
 +
 +    /** {@inheritDoc} */
 +    @Override public Result visitRangeRef(RexRangeRef rangeRef) {
 +        throw new RuntimeException("cannot translate expression " + rangeRef);
 +    }
 +
 +    /** {@inheritDoc} */
 +    @Override public Result visitSubQuery(RexSubQuery subQuery) {
 +        throw new RuntimeException("cannot translate expression " + subQuery);
 +    }
 +
 +    /** {@inheritDoc} */
 +    @Override public Result visitTableInputRef(RexTableInputRef fieldRef) {
 +        throw new RuntimeException("cannot translate expression " + fieldRef);
 +    }
 +
 +    /** {@inheritDoc} */
 +    @Override public Result visitPatternFieldRef(RexPatternFieldRef fieldRef) {
 +        return visitInputRef(fieldRef);
 +    }
 +
 +    /** */
 +    Expression checkNull(Expression expr) {
 +        if (Primitive.flavor(expr.getType()) == Primitive.Flavor.PRIMITIVE)
 +            return RexImpTable.FALSE_EXPR;
 +
 +        return Expressions.equal(expr, RexImpTable.NULL_EXPR);
 +    }
 +
 +    /** */
 +    Expression checkNotNull(Expression expr) {
 +        if (Primitive.flavor(expr.getType()) == Primitive.Flavor.PRIMITIVE)
 +            return RexImpTable.TRUE_EXPR;
 +
 +        return Expressions.notEqual(expr, RexImpTable.NULL_EXPR);
 +    }
 +
 +    /** */
 +    BlockBuilder getBlockBuilder() {
 +        return list;
 +    }
 +
 +    /** */
 +    Expression getLiteral(Expression literalVariable) {
 +        return literalMap.get(literalVariable);
 +    }
 +
 +    /** Returns the value of a literal. */
 +    Object getLiteralValue(Expression expr) {
 +        if (expr instanceof ParameterExpression) {
 +            final Expression constantExpr = literalMap.get(expr);
 +            return getLiteralValue(constantExpr);
 +        }
 +        if (expr instanceof ConstantExpression)
 +            return ((ConstantExpression)expr).value;
 +
 +        return null;
 +    }
 +
 +    /** */
 +    List<Result> getCallOperandResult(RexCall call) {
 +        return callOperandResultMap.get(call);
 +    }
 +
 +    /** Translates a field of an input to an expression. */
 +    public interface InputGetter {
 +        /** */
 +        Expression field(BlockBuilder list, int index, Type storageType);
 +    }
 +
 +    /** Result of translating a {@code RexNode}. */
 +    public static class Result {
 +        /** */
 +        final ParameterExpression isNullVariable;
 +
 +        /** */
 +        final ParameterExpression valueVariable;
 +
 +        /** */
 +        public Result(ParameterExpression isNullVariable,
 +            ParameterExpression valueVariable) {
 +            this.isNullVariable = isNullVariable;
 +            this.valueVariable = valueVariable;
 +        }
 +    }
 +
 +    /**
 +     * Handle checked Exceptions declared in Method. In such case,
 +     * method call should be wrapped in a try...catch block.
 +     * "
 +     *      final Type method_call;
 +     *      try {
 +     *        method_call = callExpr
 +     *      } catch (Exception e) {
 +     *        throw new RuntimeException(e);
 +     *      }
 +     * "
 +     */
 +    Expression handleMethodCheckedExceptions(Expression callExpr) {
 +        // Try statement
 +        ParameterExpression methodCall = Expressions.parameter(
 +            callExpr.getType(), list.newName("method_call"));
 +        list.add(Expressions.declare(Modifier.FINAL, methodCall, null));
 +        Statement st = Expressions.statement(Expressions.assign(methodCall, callExpr));
 +        // Catch Block, wrap checked exception in unchecked exception
 +        ParameterExpression e = Expressions.parameter(0, Exception.class, "e");
 +        Expression uncheckedException = Expressions.new_(RuntimeException.class, e);
 +        CatchBlock cb = Expressions.catch_(e, Expressions.throw_(uncheckedException));
 +        list.add(Expressions.tryCatch(st, cb));
 +        return methodCall;
 +    }
 +}
diff --cc modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/message/ErrorMessage.java
index 45b4385,ed995dc..2ea310f
--- a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/message/ErrorMessage.java
+++ b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/message/ErrorMessage.java
@@@ -121,8 -132,8 +121,8 @@@ public class ErrorMessage implements Ma
              return false;
  
          switch (reader.state()) {
-              case 0:
+             case 0:
 -                colNames = reader.readCollection("colNames", MessageCollectionItemType.STRING);
 +                errBytes = reader.readByteArray("errBytes");
  
                  if (!reader.isLastRead())
                      return false;
diff --cc modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/metadata/IgniteMdRowCount.java
index dd96b7e,0000000..1d92923
mode 100644,000000..100644
--- a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/metadata/IgniteMdRowCount.java
+++ b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/metadata/IgniteMdRowCount.java
@@@ -1,146 -1,0 +1,146 @@@
 +/*
 + * Licensed to the Apache Software Foundation (ASF) under one or more
 + * contributor license agreements.  See the NOTICE file distributed with
 + * this work for additional information regarding copyright ownership.
 + * The ASF licenses this file to You under the Apache License, Version 2.0
 + * (the "License"); you may not use this file except in compliance with
 + * the License.  You may obtain a copy of the License at
 + *
 + *      http://www.apache.org/licenses/LICENSE-2.0
 + *
 + * Unless required by applicable law or agreed to in writing, software
 + * distributed under the License is distributed on an "AS IS" BASIS,
 + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 + * See the License for the specific language governing permissions and
 + * limitations under the License.
 + */
 +
 +package org.apache.ignite.internal.processors.query.calcite.metadata;
 +
 +import org.apache.calcite.rel.core.Intersect;
 +import org.apache.calcite.rel.core.Join;
 +import org.apache.calcite.rel.core.JoinInfo;
 +import org.apache.calcite.rel.core.JoinRelType;
 +import org.apache.calcite.rel.core.Minus;
 +import org.apache.calcite.rel.metadata.ReflectiveRelMetadataProvider;
 +import org.apache.calcite.rel.metadata.RelMdRowCount;
 +import org.apache.calcite.rel.metadata.RelMdUtil;
 +import org.apache.calcite.rel.metadata.RelMetadataProvider;
 +import org.apache.calcite.rel.metadata.RelMetadataQuery;
 +import org.apache.calcite.rex.RexNode;
 +import org.apache.calcite.util.BuiltInMethod;
 +import org.apache.calcite.util.ImmutableBitSet;
 +import org.apache.calcite.util.ImmutableIntList;
 +import org.apache.calcite.util.Util;
 +import org.apache.ignite.internal.processors.query.calcite.rel.IgniteAggregate;
 +import org.apache.ignite.internal.processors.query.calcite.rel.IgniteLimit;
 +import org.apache.ignite.internal.processors.query.calcite.rel.IgniteSortedIndexSpool;
 +import org.apache.ignite.internal.util.typedef.F;
 +import org.jetbrains.annotations.Nullable;
 +
 +import static org.apache.calcite.util.NumberUtil.multiply;
 +
 +/** */
 +@SuppressWarnings("unused") // actually all methods are used by runtime generated classes
 +public class IgniteMdRowCount extends RelMdRowCount {
 +    /** */
 +    public static final RelMetadataProvider SOURCE =
 +        ReflectiveRelMetadataProvider.reflectiveSource(
 +            BuiltInMethod.ROW_COUNT.method, new IgniteMdRowCount());
 +
 +    /** {@inheritDoc} */
 +    @Override public Double getRowCount(Join rel, RelMetadataQuery mq) {
 +        return rel.estimateRowCount(mq);
 +    }
 +
 +    /** */
 +    @Nullable public static Double joinRowCount(RelMetadataQuery mq, Join rel) {
 +        if (!rel.getJoinType().projectsRight()) {
-           // Create a RexNode representing the selectivity of the
-           // semijoin filter and pass it to getSelectivity
-           RexNode semiJoinSelectivity =
-               RelMdUtil.makeSemiJoinSelectivityRexNode(mq, rel);
++            // Create a RexNode representing the selectivity of the
++            // semijoin filter and pass it to getSelectivity
++            RexNode semiJoinSelectivity =
++                RelMdUtil.makeSemiJoinSelectivityRexNode(mq, rel);
 +
-           return multiply(mq.getSelectivity(rel.getLeft(), semiJoinSelectivity),
-               mq.getRowCount(rel.getLeft()));
++            return multiply(mq.getSelectivity(rel.getLeft(), semiJoinSelectivity),
++                mq.getRowCount(rel.getLeft()));
 +        }
 +
 +        // Row count estimates of 0 will be rounded up to 1.
 +        // So, use maxRowCount where the product is very small.
 +        final Double left = mq.getRowCount(rel.getLeft());
 +        final Double right = mq.getRowCount(rel.getRight());
 +
 +        if (left == null || right == null)
 +            return null;
 +
 +        if (left <= 1D || right <= 1D) {
 +            Double max = mq.getMaxRowCount(rel);
 +            if (max != null && max <= 1D)
 +                return max;
 +        }
 +
 +        JoinInfo joinInfo = rel.analyzeCondition();
 +
 +        ImmutableIntList leftKeys = joinInfo.leftKeys;
 +        ImmutableIntList rightKeys = joinInfo.rightKeys;
 +
 +        double selectivity = mq.getSelectivity(rel, rel.getCondition());
 +
 +        if (F.isEmpty(leftKeys) || F.isEmpty(rightKeys))
 +            return left * right * selectivity;
 +
 +        double leftDistinct = Util.first(
 +            mq.getDistinctRowCount(rel.getLeft(), ImmutableBitSet.of(leftKeys), null), left);
 +        double rightDistinct = Util.first(
 +            mq.getDistinctRowCount(rel.getRight(), ImmutableBitSet.of(rightKeys), null), right);
 +
 +        double leftCardinality = leftDistinct / left;
 +        double rightCardinality = rightDistinct / right;
 +
 +        double rowsCount = (Math.min(left, right) / (leftCardinality * rightCardinality)) * selectivity;
 +
 +        JoinRelType type = rel.getJoinType();
 +
 +        if (type == JoinRelType.LEFT)
 +            rowsCount += left;
 +        else if (type == JoinRelType.RIGHT)
 +            rowsCount += right;
 +        else if (type == JoinRelType.FULL)
 +            rowsCount += left + right;
 +
 +        return rowsCount;
 +    }
 +
 +    /**
 +     * RowCount of Spool equals to estimated row count of its child by default,
 +     * but IndexSpool has internal filter that could filter out some rows,
 +     * hence we need to estimate it differently.
 +     */
 +    public double getRowCount(IgniteSortedIndexSpool rel, RelMetadataQuery mq) {
 +        return rel.estimateRowCount(mq);
 +    }
 +
 +    /** {@inheritDoc} */
 +    @Override public Double getRowCount(Intersect rel, RelMetadataQuery mq) {
 +        return rel.estimateRowCount(mq);
 +    }
 +
 +    /** {@inheritDoc} */
 +    @Override public Double getRowCount(Minus rel, RelMetadataQuery mq) {
 +        return rel.estimateRowCount(mq);
 +    }
 +
 +    /**
 +     * Estimation of row count for Aggregate operator.
 +     */
 +    public double getRowCount(IgniteAggregate rel, RelMetadataQuery mq) {
 +        return rel.estimateRowCount(mq);
 +    }
 +
 +    /**
 +     * Estimation of row count for Limit operator.
 +     */
 +    public double getRowCount(IgniteLimit rel, RelMetadataQuery mq) {
 +        return rel.estimateRowCount(mq);
 +    }
 +}
diff --cc modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/metadata/cost/IgniteCost.java
index 84d40f3,0000000..babfb81
mode 100644,000000..100644
--- a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/metadata/cost/IgniteCost.java
+++ b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/metadata/cost/IgniteCost.java
@@@ -1,234 -1,0 +1,232 @@@
 +/*
 + * Licensed to the Apache Software Foundation (ASF) under one or more
 + * contributor license agreements.  See the NOTICE file distributed with
 + * this work for additional information regarding copyright ownership.
 + * The ASF licenses this file to You under the Apache License, Version 2.0
 + * (the "License"); you may not use this file except in compliance with
 + * the License.  You may obtain a copy of the License at
 + *
 + *      http://www.apache.org/licenses/LICENSE-2.0
 + *
 + * Unless required by applicable law or agreed to in writing, software
 + * distributed under the License is distributed on an "AS IS" BASIS,
 + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 + * See the License for the specific language governing permissions and
 + * limitations under the License.
 + */
 +
 +package org.apache.ignite.internal.processors.query.calcite.metadata.cost;
 +
 +import java.util.Objects;
 +
 +import org.apache.calcite.plan.RelOptCost;
 +import org.apache.calcite.plan.RelOptUtil;
 +import org.apache.calcite.rel.RelNode;
 +import org.apache.ignite.internal.util.typedef.internal.S;
 +
 +/**
 + * Estimated execution cost of a {@link RelNode}. Measured in abstract points.
 + */
 +public class IgniteCost implements RelOptCost {
 +    /** Cost of a passing a single row through an execution node. */
 +    public static final double ROW_PASS_THROUGH_COST = 1;
 +
 +    /** Size of a particular field. */
 +    public static final double AVERAGE_FIELD_SIZE = 4; // such accuracy should be enough for an estimate
 +
 +    /** Cost of a comparison of one row. */
 +    public static final double ROW_COMPARISON_COST = 3;
 +
 +    /** Memory cost of a aggregate call. */
 +    public static final double AGG_CALL_MEM_COST = 5;
 +
 +    /** Cost of a lookup at the hash. */
 +    public static final double HASH_LOOKUP_COST = 10;
 +
 +    /**
 +     * With broadcast distribution each row will be sent to the each distination node,
 +     * thus the total bytes amount will be multiplies of the destination nodes count.
 +     * Right now it's just a const.
 +     */
 +    public static final double BROADCAST_DISTRIBUTION_PENALTY = 5;
 +
 +    /** */
 +    static final IgniteCost ZERO = new IgniteCost(0, 0, 0, 0, 0);
 +
 +    /** */
 +    static final IgniteCost TINY = new IgniteCost(1, 1, 1, 1, 1);
 +
 +    /** */
 +    static final IgniteCost HUGE = new IgniteCost(
 +        Double.MAX_VALUE,
 +        Double.MAX_VALUE,
 +        Double.MAX_VALUE,
 +        Double.MAX_VALUE,
 +        Double.MAX_VALUE
 +    );
 +
 +    /** */
 +    static final IgniteCost INFINITY = new IgniteCost(
 +        Double.POSITIVE_INFINITY,
 +        Double.POSITIVE_INFINITY,
 +        Double.POSITIVE_INFINITY,
 +        Double.POSITIVE_INFINITY,
 +        Double.POSITIVE_INFINITY
 +    );
 +
 +    /** Count of the processed rows. */
 +    private final double rowCount;
 +
 +    /** Amount of CPU points. */
 +    private final double cpu;
 +
 +    /** Amount of Memory points. */
 +    private final double memory;
 +
 +    /** Amount of IO points. */
 +    private final double io;
 +
 +    /** Amount of Network points. */
 +    private final double network;
 +
 +    /**
 +     * @param rowCount Row count.
 +     * @param cpu Cpu.
 +     * @param memory Memory.
 +     * @param io Io.
 +     * @param network Network.
 +     */
 +    IgniteCost(double rowCount, double cpu, double memory, double io, double network) {
 +        this.rowCount = rowCount;
 +        this.cpu = cpu;
 +        this.memory = memory;
 +        this.io = io;
 +        this.network = network;
 +    }
 +
 +    /** {@inheritDoc} */
 +    @Override public double getRows() {
 +        return rowCount;
 +    }
 +
 +    /** {@inheritDoc} */
 +    @Override public double getCpu() {
 +        return cpu;
 +    }
 +
 +    /** {@inheritDoc} */
 +    @Override public double getIo() {
 +        return io;
 +    }
 +
 +    /**
 +     * @return Usage of Memory resources.
 +     */
 +    public double getMemory() {
 +        return memory;
 +    }
 +
 +    /**
 +     * @return Usage of Network resources.
 +     */
 +    public double getNetwork() {
 +        return network;
 +    }
 +
 +    /** {@inheritDoc} */
 +    @Override public boolean isInfinite() {
 +        return this == INFINITY
 +            || rowCount == Double.POSITIVE_INFINITY
 +            || cpu == Double.POSITIVE_INFINITY
 +            || memory == Double.POSITIVE_INFINITY
 +            || io == Double.POSITIVE_INFINITY
 +            || network == Double.POSITIVE_INFINITY;
 +    }
 +
 +    /** {@inheritDoc} */
 +    @Override public int hashCode() {
 +        return Objects.hash(rowCount, cpu, io, memory, network);
 +    }
 +
 +    /** {@inheritDoc} */
 +    @SuppressWarnings("FloatingPointEquality")
 +    @Override public boolean equals(RelOptCost cost) {
 +        return this == cost || (cost instanceof IgniteCost
 +            && rowCount == ((IgniteCost)cost).rowCount
 +            && cpu == ((IgniteCost)cost).cpu
 +            && memory == ((IgniteCost)cost).memory
 +            && io == ((IgniteCost)cost).io
-             && network == ((IgniteCost)cost).network
-         );
++            && network == ((IgniteCost)cost).network);
 +    }
 +
 +    /** {@inheritDoc} */
 +    @Override public boolean isEqWithEpsilon(RelOptCost cost) {
 +        return this == cost || (cost instanceof IgniteCost
 +            && Math.abs(rowCount - ((IgniteCost)cost).rowCount) < RelOptUtil.EPSILON
 +            && Math.abs(cpu - ((IgniteCost)cost).cpu) < RelOptUtil.EPSILON
 +            && Math.abs(memory - ((IgniteCost)cost).memory) < RelOptUtil.EPSILON
 +            && Math.abs(io - ((IgniteCost)cost).io) < RelOptUtil.EPSILON
-             && Math.abs(network - ((IgniteCost)cost).network) < RelOptUtil.EPSILON
-         );
++            && Math.abs(network - ((IgniteCost)cost).network) < RelOptUtil.EPSILON);
 +    }
 +
 +    /** {@inheritDoc} */
 +    @Override public boolean isLe(RelOptCost cost) {
 +        IgniteCost other = (IgniteCost)cost;
 +
 +        return this == cost || (cpu + memory + io + network) <= (other.cpu + other.memory + other.io + other.network);
 +    }
 +
 +    /** {@inheritDoc} */
 +    @Override public boolean isLt(RelOptCost cost) {
 +        IgniteCost other = (IgniteCost)cost;
 +
 +        return this != cost && (cpu + memory + io + network) < (other.cpu + other.memory + other.io + other.network);
 +    }
 +
 +    /** {@inheritDoc} */
 +    @Override public RelOptCost plus(RelOptCost cost) {
 +        IgniteCost other = (IgniteCost)cost;
 +
 +        return new IgniteCost(
 +            rowCount + other.rowCount,
 +            cpu + other.cpu,
 +            memory + other.memory,
 +            io + other.io,
 +            network + other.network
 +        );
 +    }
 +
 +    /** {@inheritDoc} */
 +    @Override public RelOptCost minus(RelOptCost cost) {
 +        IgniteCost other = (IgniteCost)cost;
 +
 +        return new IgniteCost(
 +            rowCount - other.rowCount,
 +            cpu - other.cpu,
 +            memory - other.memory,
 +            io - other.io,
 +            network - other.network
 +        );
 +    }
 +
 +    /** {@inheritDoc} */
 +    @Override public RelOptCost multiplyBy(double factor) {
 +        return new IgniteCost(
 +            rowCount * factor,
 +            cpu * factor,
 +            memory * factor,
 +            io * factor,
 +            network * factor
 +        );
 +    }
 +
 +    /** {@inheritDoc} */
 +    @Override public double divideBy(RelOptCost cost) {
 +        throw new UnsupportedOperationException(IgniteCost.class.getSimpleName() + "#divideBy");
 +    }
 +
 +    /** {@inheritDoc} */
 +    @Override public String toString() {
 +        return S.toString(IgniteCost.class, this);
 +    }
 +}
diff --cc modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/prepare/IgniteSqlToRelConvertor.java
index 7963a67,0000000..a61dffa
mode 100644,000000..100644
--- a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/prepare/IgniteSqlToRelConvertor.java
+++ b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/prepare/IgniteSqlToRelConvertor.java
@@@ -1,168 -1,0 +1,168 @@@
 +/*
 + * Licensed to the Apache Software Foundation (ASF) under one or more
 + * contributor license agreements.  See the NOTICE file distributed with
 + * this work for additional information regarding copyright ownership.
 + * The ASF licenses this file to You under the Apache License, Version 2.0
 + * (the "License"); you may not use this file except in compliance with
 + * the License.  You may obtain a copy of the License at
 + *
 + *      http://www.apache.org/licenses/LICENSE-2.0
 + *
 + * Unless required by applicable law or agreed to in writing, software
 + * distributed under the License is distributed on an "AS IS" BASIS,
 + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 + * See the License for the specific language governing permissions and
 + * limitations under the License.
 + */
 +
 +package org.apache.ignite.internal.processors.query.calcite.prepare;
 +
 +import java.util.ArrayList;
 +import java.util.List;
 +import org.apache.calcite.plan.RelOptCluster;
 +import org.apache.calcite.plan.RelOptTable;
 +import org.apache.calcite.prepare.Prepare;
 +import org.apache.calcite.rel.RelNode;
 +import org.apache.calcite.rel.RelRoot;
 +import org.apache.calcite.rel.core.JoinRelType;
 +import org.apache.calcite.rel.logical.LogicalJoin;
 +import org.apache.calcite.rel.logical.LogicalProject;
 +import org.apache.calcite.rel.logical.LogicalTableModify;
 +import org.apache.calcite.rel.type.RelDataType;
 +import org.apache.calcite.rel.type.RelDataTypeField;
 +import org.apache.calcite.rex.RexInputRef;
 +import org.apache.calcite.rex.RexNode;
 +import org.apache.calcite.sql.SqlIdentifier;
 +import org.apache.calcite.sql.SqlInsert;
 +import org.apache.calcite.sql.SqlKind;
 +import org.apache.calcite.sql.SqlMerge;
 +import org.apache.calcite.sql.SqlNode;
 +import org.apache.calcite.sql.SqlUpdate;
 +import org.apache.calcite.sql.validate.SqlValidator;
 +import org.apache.calcite.sql.validate.SqlValidatorUtil;
 +import org.apache.calcite.sql2rel.SqlRexConvertletTable;
 +import org.apache.calcite.sql2rel.SqlToRelConverter;
 +import org.apache.calcite.tools.RelBuilder;
 +import org.checkerframework.checker.nullness.qual.Nullable;
 +
 +import static java.util.Objects.requireNonNull;
 +
 +/**
 + * Converts a SQL parse tree into a relational algebra operators.
 + */
 +public class IgniteSqlToRelConvertor extends SqlToRelConverter {
 +    /** */
 +    public IgniteSqlToRelConvertor(
 +        RelOptTable.ViewExpander viewExpander,
 +        @Nullable SqlValidator validator,
 +        Prepare.CatalogReader catalogReader, RelOptCluster cluster,
 +        SqlRexConvertletTable convertletTable,
 +        Config cfg
 +    ) {
 +        super(viewExpander, validator, catalogReader, cluster, convertletTable, cfg);
 +    }
 +
 +    /** {@inheritDoc} */
 +    @Override protected RelRoot convertQueryRecursive(SqlNode qry, boolean top, @Nullable RelDataType targetRowType) {
-             if (qry.getKind() == SqlKind.MERGE)
-                 return RelRoot.of(convertMerge((SqlMerge)qry), qry.getKind());
-             else
-                 return super.convertQueryRecursive(qry, top, targetRowType);
++        if (qry.getKind() == SqlKind.MERGE)
++            return RelRoot.of(convertMerge((SqlMerge)qry), qry.getKind());
++        else
++            return super.convertQueryRecursive(qry, top, targetRowType);
 +    }
 +
 +    /**
 +     * This method was copy-pasted from super-method except this changes:
 +     * - For updateCall we require all columns in the project and should not skip anything.
 +     * - If there is no updateCall, LEFT JOIN converted to ANTI JOIN.
 +     */
 +    private RelNode convertMerge(SqlMerge call) {
 +        RelOptTable targetTable = getTargetTable(call);
 +
 +        // convert update column list from SqlIdentifier to String
 +        final List<String> targetColumnNameList = new ArrayList<>();
 +        final RelDataType targetRowType = targetTable.getRowType();
 +        SqlUpdate updateCall = call.getUpdateCall();
 +        if (updateCall != null) {
 +            for (SqlNode targetColumn : updateCall.getTargetColumnList()) {
 +                SqlIdentifier id = (SqlIdentifier)targetColumn;
 +                RelDataTypeField field =
 +                    SqlValidatorUtil.getTargetField(
 +                        targetRowType, typeFactory, id, catalogReader, targetTable);
 +                assert field != null : "column " + id.toString() + " not found";
 +                targetColumnNameList.add(field.getName());
 +            }
 +        }
 +
 +        // replace the projection of the source select with a
 +        // projection that contains the following:
 +        // 1) the expressions corresponding to the new insert row (if there is
 +        //    an insert)
 +        // 2) all columns from the target table (if there is an update)
 +        // 3) the set expressions in the update call (if there is an update)
 +
 +        // first, convert the merge's source select to construct the columns
 +        // from the target table and the set expressions in the update call
 +        RelNode mergeSourceRel = convertSelect(
 +            requireNonNull(call.getSourceSelect(), () -> "sourceSelect for " + call), false);
 +
 +        // then, convert the insert statement so we can get the insert
 +        // values expressions
 +        SqlInsert insertCall = call.getInsertCall();
 +        int nLevel1Exprs = 0;
 +        List<RexNode> level1InsertExprs = null;
 +        List<RexNode> level2InsertExprs = null;
 +        if (insertCall != null) {
 +            RelNode insertRel = convertInsert(insertCall);
 +
 +            // if there are 2 level of projections in the insert source, combine
 +            // them into a single project; level1 refers to the topmost project;
 +            // the level1 projection contains references to the level2
 +            // expressions, except in the case where no target expression was
 +            // provided, in which case, the expression is the default value for
 +            // the column; or if the expressions directly map to the source
 +            // table
 +            level1InsertExprs =
 +                ((LogicalProject)insertRel.getInput(0)).getProjects();
 +            if (insertRel.getInput(0).getInput(0) instanceof LogicalProject) {
 +                level2InsertExprs =
 +                    ((LogicalProject)insertRel.getInput(0).getInput(0))
 +                        .getProjects();
 +            }
 +            nLevel1Exprs = level1InsertExprs.size();
 +        }
 +
 +        LogicalJoin join = (LogicalJoin)mergeSourceRel.getInput(0);
 +
 +        final List<RexNode> projects = new ArrayList<>();
 +
 +        for (int level1Idx = 0; level1Idx < nLevel1Exprs; level1Idx++) {
 +            requireNonNull(level1InsertExprs, "level1InsertExprs");
 +            if ((level2InsertExprs != null)
 +                && (level1InsertExprs.get(level1Idx) instanceof RexInputRef)) {
 +                int level2Idx =
 +                    ((RexInputRef)level1InsertExprs.get(level1Idx)).getIndex();
 +                projects.add(level2InsertExprs.get(level2Idx));
 +            } else
 +                projects.add(level1InsertExprs.get(level1Idx));
 +        }
 +        if (updateCall != null) {
 +            final LogicalProject project = (LogicalProject)mergeSourceRel;
 +            projects.addAll(project.getProjects());
 +        }
 +        else {
 +            // Convert to ANTI join if there is no UPDATE clause.
 +            join = join.copy(join.getTraitSet(), join.getCondition(), join.getLeft(), join.getRight(), JoinRelType.ANTI,
 +                false);
 +        }
 +
 +        RelBuilder relBuilder = config.getRelBuilderFactory().create(cluster, null)
 +            .transform(config.getRelBuilderConfigTransform());
 +
 +        relBuilder.push(join)
 +            .project(projects);
 +
 +        return LogicalTableModify.create(targetTable, catalogReader,
 +            relBuilder.build(), LogicalTableModify.Operation.MERGE,
 +            targetColumnNameList, null, false);
 +    }
 +}
diff --cc modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/prepare/ddl/DdlSqlToCommandConverter.java
index 5dd8ade,0000000..371100b
mode 100644,000000..100644
--- a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/prepare/ddl/DdlSqlToCommandConverter.java
+++ b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/prepare/ddl/DdlSqlToCommandConverter.java
@@@ -1,473 -1,0 +1,473 @@@
 +/*
 + * Licensed to the Apache Software Foundation (ASF) under one or more
 + * contributor license agreements.  See the NOTICE file distributed with
 + * this work for additional information regarding copyright ownership.
 + * The ASF licenses this file to You under the Apache License, Version 2.0
 + * (the "License"); you may not use this file except in compliance with
 + * the License.  You may obtain a copy of the License at
 + *
 + *      http://www.apache.org/licenses/LICENSE-2.0
 + *
 + * Unless required by applicable law or agreed to in writing, software
 + * distributed under the License is distributed on an "AS IS" BASIS,
 + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 + * See the License for the specific language governing permissions and
 + * limitations under the License.
 + */
 +
 +package org.apache.ignite.internal.processors.query.calcite.prepare.ddl;
 +
 +import java.lang.reflect.Type;
 +import java.util.ArrayList;
 +import java.util.Arrays;
 +import java.util.HashSet;
 +import java.util.List;
 +import java.util.Map;
 +import java.util.Set;
 +import java.util.function.BiConsumer;
 +import java.util.function.BiFunction;
 +import java.util.function.Function;
 +import java.util.stream.Collectors;
 +import java.util.stream.Stream;
 +import org.apache.calcite.DataContext;
 +import org.apache.calcite.rel.type.RelDataType;
 +import org.apache.calcite.rel.type.RelDataTypeField;
 +import org.apache.calcite.sql.SqlDdl;
 +import org.apache.calcite.sql.SqlIdentifier;
 +import org.apache.calcite.sql.SqlInsert;
 +import org.apache.calcite.sql.SqlLiteral;
 +import org.apache.calcite.sql.SqlNode;
 +import org.apache.calcite.sql.SqlNodeList;
 +import org.apache.calcite.sql.SqlNumericLiteral;
 +import org.apache.calcite.sql.ddl.SqlColumnDeclaration;
 +import org.apache.calcite.sql.ddl.SqlDropTable;
 +import org.apache.calcite.sql.ddl.SqlKeyConstraint;
 +import org.apache.ignite.cache.CacheAtomicityMode;
 +import org.apache.ignite.cache.CacheWriteSynchronizationMode;
 +import org.apache.ignite.internal.processors.cache.query.IgniteQueryErrorCode;
 +import org.apache.ignite.internal.processors.query.IgniteSQLException;
 +import org.apache.ignite.internal.processors.query.QueryUtils;
 +import org.apache.ignite.internal.processors.query.calcite.prepare.BaseDataContext;
 +import org.apache.ignite.internal.processors.query.calcite.prepare.IgnitePlanner;
 +import org.apache.ignite.internal.processors.query.calcite.prepare.PlanningContext;
 +import org.apache.ignite.internal.processors.query.calcite.prepare.ValidationResult;
 +import org.apache.ignite.internal.processors.query.calcite.sql.IgniteSqlAlterTableAddColumn;
 +import org.apache.ignite.internal.processors.query.calcite.sql.IgniteSqlAlterTableDropColumn;
 +import org.apache.ignite.internal.processors.query.calcite.sql.IgniteSqlCommit;
 +import org.apache.ignite.internal.processors.query.calcite.sql.IgniteSqlCreateTable;
 +import org.apache.ignite.internal.processors.query.calcite.sql.IgniteSqlCreateTableOption;
 +import org.apache.ignite.internal.processors.query.calcite.sql.IgniteSqlCreateTableOptionEnum;
 +import org.apache.ignite.internal.processors.query.calcite.sql.IgniteSqlRollback;
 +import org.apache.ignite.internal.processors.query.calcite.util.TypeUtils;
 +import org.apache.ignite.internal.util.typedef.F;
 +
 +import static org.apache.calcite.sql.type.SqlTypeName.BOOLEAN;
 +import static org.apache.ignite.internal.processors.query.calcite.sql.IgniteSqlCreateTableOptionEnum.AFFINITY_KEY;
 +import static org.apache.ignite.internal.processors.query.calcite.sql.IgniteSqlCreateTableOptionEnum.ATOMICITY;
 +import static org.apache.ignite.internal.processors.query.calcite.sql.IgniteSqlCreateTableOptionEnum.BACKUPS;
 +import static org.apache.ignite.internal.processors.query.calcite.sql.IgniteSqlCreateTableOptionEnum.CACHE_GROUP;
 +import static org.apache.ignite.internal.processors.query.calcite.sql.IgniteSqlCreateTableOptionEnum.CACHE_NAME;
 +import static org.apache.ignite.internal.processors.query.calcite.sql.IgniteSqlCreateTableOptionEnum.DATA_REGION;
 +import static org.apache.ignite.internal.processors.query.calcite.sql.IgniteSqlCreateTableOptionEnum.ENCRYPTED;
 +import static org.apache.ignite.internal.processors.query.calcite.sql.IgniteSqlCreateTableOptionEnum.KEY_TYPE;
 +import static org.apache.ignite.internal.processors.query.calcite.sql.IgniteSqlCreateTableOptionEnum.TEMPLATE;
 +import static org.apache.ignite.internal.processors.query.calcite.sql.IgniteSqlCreateTableOptionEnum.VALUE_TYPE;
 +import static org.apache.ignite.internal.processors.query.calcite.sql.IgniteSqlCreateTableOptionEnum.WRITE_SYNCHRONIZATION_MODE;
 +import static org.apache.ignite.internal.processors.query.calcite.util.PlanUtils.deriveObjectName;
 +import static org.apache.ignite.internal.processors.query.calcite.util.PlanUtils.deriveSchemaName;
 +
 +/** */
 +public class DdlSqlToCommandConverter {
 +    /** Processor that validates a value is a Sql Identifier. */
 +    private static final BiFunction<IgniteSqlCreateTableOption, PlanningContext, String> VALUE_IS_IDENTIFIER_VALIDATOR = (opt, ctx) -> {
 +        if (!(opt.value() instanceof SqlIdentifier) || !((SqlIdentifier)opt.value()).isSimple())
 +            throwOptionParsingException(opt, "a simple identifier", ctx.query());
 +
 +        return ((SqlIdentifier)opt.value()).getSimple();
 +    };
 +
 +    /** Processor that unconditionally throws an AssertionException. */
 +    private static final TableOptionProcessor<Void> UNSUPPORTED_OPTION_PROCESSOR = new TableOptionProcessor<>(
 +        null,
 +        (opt, ctx) -> {
 +            throw new AssertionError("Unsupported option " + opt.key());
 +        },
 +        null);
 +
 +    /** Map of the supported table option processors. */
 +    private final Map<IgniteSqlCreateTableOptionEnum, TableOptionProcessor<?>> tblOptionProcessors = Stream.of(
 +        new TableOptionProcessor<>(TEMPLATE, VALUE_IS_IDENTIFIER_VALIDATOR, CreateTableCommand::templateName),
 +        new TableOptionProcessor<>(AFFINITY_KEY, VALUE_IS_IDENTIFIER_VALIDATOR, CreateTableCommand::affinityKey),
 +        new TableOptionProcessor<>(CACHE_GROUP, VALUE_IS_IDENTIFIER_VALIDATOR, CreateTableCommand::cacheGroup),
 +        new TableOptionProcessor<>(CACHE_NAME, VALUE_IS_IDENTIFIER_VALIDATOR, CreateTableCommand::cacheName),
 +        new TableOptionProcessor<>(DATA_REGION, VALUE_IS_IDENTIFIER_VALIDATOR, CreateTableCommand::dataRegionName),
 +        new TableOptionProcessor<>(KEY_TYPE, VALUE_IS_IDENTIFIER_VALIDATOR, CreateTableCommand::keyTypeName),
 +        new TableOptionProcessor<>(VALUE_TYPE, VALUE_IS_IDENTIFIER_VALIDATOR, CreateTableCommand::valueTypeName),
 +        new TableOptionProcessor<>(ATOMICITY, validatorForEnumValue(CacheAtomicityMode.class), CreateTableCommand::atomicityMode),
 +        new TableOptionProcessor<>(WRITE_SYNCHRONIZATION_MODE, validatorForEnumValue(CacheWriteSynchronizationMode.class),
 +            CreateTableCommand::writeSynchronizationMode),
 +        new TableOptionProcessor<>(BACKUPS, (opt, ctx) -> {
-                 if (!(opt.value() instanceof SqlNumericLiteral)
-                     || !((SqlNumericLiteral)opt.value()).isInteger()
-                     || ((SqlLiteral)opt.value()).intValue(true) < 0
-                 )
-                     throwOptionParsingException(opt, "a non-negative integer", ctx.query());
++            if (!(opt.value() instanceof SqlNumericLiteral)
++                || !((SqlNumericLiteral)opt.value()).isInteger()
++                || ((SqlLiteral)opt.value()).intValue(true) < 0
++            )
++                throwOptionParsingException(opt, "a non-negative integer", ctx.query());
 +
-                 return ((SqlLiteral)opt.value()).intValue(true);
-             }, CreateTableCommand::backups),
++            return ((SqlLiteral)opt.value()).intValue(true);
++        }, CreateTableCommand::backups),
 +        new TableOptionProcessor<>(ENCRYPTED, (opt, ctx) -> {
 +            if (!(opt.value() instanceof SqlLiteral) && ((SqlLiteral)opt.value()).getTypeName() != BOOLEAN)
 +                throwOptionParsingException(opt, "a boolean", ctx.query());
 +
 +            return ((SqlLiteral)opt.value()).booleanValue();
 +        }, CreateTableCommand::encrypted)
 +        ).collect(Collectors.toMap(TableOptionProcessor::key, Function.identity()));
 +
 +    /**
 +     * Converts a given ddl AST to a ddl command.
 +     *
 +     * @param ddlNode Root node of the given AST.
 +     * @param ctx Planning context.
 +     */
 +    public DdlCommand convert(SqlDdl ddlNode, PlanningContext ctx) {
 +        if (ddlNode instanceof IgniteSqlCreateTable)
 +            return convertCreateTable((IgniteSqlCreateTable)ddlNode, ctx);
 +
 +        if (ddlNode instanceof SqlDropTable)
 +            return convertDropTable((SqlDropTable)ddlNode, ctx);
 +
 +        if (ddlNode instanceof IgniteSqlAlterTableAddColumn)
 +            return convertAlterTableAdd((IgniteSqlAlterTableAddColumn)ddlNode, ctx);
 +
 +        if (ddlNode instanceof IgniteSqlAlterTableDropColumn)
 +            return convertAlterTableDrop((IgniteSqlAlterTableDropColumn)ddlNode, ctx);
 +
 +        if (ddlNode instanceof IgniteSqlCommit || ddlNode instanceof IgniteSqlRollback)
 +            return new TransactionCommand();
 +
 +        if (SqlToNativeCommandConverter.isSupported(ddlNode))
 +            return SqlToNativeCommandConverter.convert(ddlNode, ctx);
 +
 +        throw new IgniteSQLException("Unsupported operation [" +
 +            "sqlNodeKind=" + ddlNode.getKind() + "; " +
 +            "querySql=\"" + ctx.query() + "\"]", IgniteQueryErrorCode.UNSUPPORTED_OPERATION);
 +    }
 +
 +    /**
 +     * Converts a given CreateTable AST to a CreateTable command.
 +     *
 +     * @param createTblNode Root node of the given AST.
 +     * @param ctx Planning context.
 +     */
 +    private CreateTableCommand convertCreateTable(IgniteSqlCreateTable createTblNode, PlanningContext ctx) {
 +        CreateTableCommand createTblCmd = new CreateTableCommand();
 +
 +        String schemaName = deriveSchemaName(createTblNode.name(), ctx);
 +        String tableName = deriveObjectName(createTblNode.name(), ctx, "tableName");
 +
 +        createTblCmd.schemaName(schemaName);
 +        createTblCmd.tableName(tableName);
 +        createTblCmd.ifNotExists(createTblNode.ifNotExists());
 +        createTblCmd.templateName(QueryUtils.TEMPLATE_PARTITIONED);
 +
 +        if (createTblNode.createOptionList() != null) {
 +            for (SqlNode optNode : createTblNode.createOptionList().getList()) {
 +                IgniteSqlCreateTableOption opt = (IgniteSqlCreateTableOption)optNode;
 +
 +                tblOptionProcessors.getOrDefault(opt.key(), UNSUPPORTED_OPTION_PROCESSOR).process(opt, ctx, createTblCmd);
 +            }
 +        }
 +
 +        IgnitePlanner planner = ctx.planner();
 +
 +        if (createTblNode.query() == null) {
 +            List<SqlColumnDeclaration> colDeclarations = createTblNode.columnList().getList().stream()
 +                .filter(SqlColumnDeclaration.class::isInstance)
 +                .map(SqlColumnDeclaration.class::cast)
 +                .collect(Collectors.toList());
 +
 +            List<ColumnDefinition> cols = new ArrayList<>();
 +
 +            for (SqlColumnDeclaration col : colDeclarations) {
 +                if (!col.name.isSimple())
 +                    throw new IgniteSQLException("Unexpected value of columnName [" +
 +                        "expected a simple identifier, but was " + col.name + "; " +
 +                        "querySql=\"" + ctx.query() + "\"]", IgniteQueryErrorCode.PARSING);
 +
 +                String name = col.name.getSimple();
 +                RelDataType type = planner.convert(col.dataType);
 +
 +                Object dflt = null;
 +                if (col.expression != null) {
 +                    assert col.expression instanceof SqlLiteral;
 +
 +                    Type storageType = ctx.typeFactory().getResultClass(type);
 +
 +                    DataContext dataCtx = new BaseDataContext(ctx.typeFactory());
 +
 +                    dflt = TypeUtils.fromLiteral(dataCtx, storageType, (SqlLiteral)col.expression);
 +                }
 +
 +                cols.add(new ColumnDefinition(name, type, dflt));
 +            }
 +
 +            createTblCmd.columns(cols);
 +
 +            List<SqlKeyConstraint> pkConstraints = createTblNode.columnList().getList().stream()
 +                .filter(SqlKeyConstraint.class::isInstance)
 +                .map(SqlKeyConstraint.class::cast)
 +                .collect(Collectors.toList());
 +
 +            if (pkConstraints.size() > 1)
 +                throw new IgniteSQLException("Unexpected amount of primary key constraints [" +
 +                    "expected at most one, but was " + pkConstraints.size() + "; " +
 +                    "querySql=\"" + ctx.query() + "\"]", IgniteQueryErrorCode.PARSING);
 +
 +            if (!F.isEmpty(pkConstraints)) {
 +                Set<String> dedupSet = new HashSet<>();
 +
 +                List<String> pkCols = pkConstraints.stream()
 +                    .map(pk -> pk.getOperandList().get(1))
 +                    .map(SqlNodeList.class::cast)
 +                    .flatMap(l -> l.getList().stream())
 +                    .map(SqlIdentifier.class::cast)
 +                    .map(SqlIdentifier::getSimple)
 +                    .filter(dedupSet::add)
 +                    .collect(Collectors.toList());
 +
 +                createTblCmd.primaryKeyColumns(pkCols);
 +            }
 +        }
 +        else { // CREATE AS SELECT.
 +            ValidationResult res = planner.validateAndGetTypeMetadata(createTblNode.query());
 +
 +            // Create INSERT node on top of AS SELECT node.
 +            SqlInsert sqlInsert = new SqlInsert(
 +                createTblNode.query().getParserPosition(),
 +                SqlNodeList.EMPTY,
 +                createTblNode.name(),
 +                res.sqlNode(),
 +                null
 +            );
 +
 +            createTblCmd.insertStatement(sqlInsert);
 +
 +            List<RelDataTypeField> fields = res.dataType().getFieldList();
 +            List<ColumnDefinition> cols = new ArrayList<>(fields.size());
 +
 +            if (createTblNode.columnList() != null) {
 +                // Derive column names from the CREATE TABLE clause and column types from the query.
 +                List<SqlIdentifier> colNames = createTblNode.columnList().getList().stream()
 +                    .map(SqlIdentifier.class::cast)
 +                    .collect(Collectors.toList());
 +
 +                if (fields.size() != colNames.size()) {
 +                    throw new IgniteSQLException("Number of columns must match number of query columns",
 +                        IgniteQueryErrorCode.PARSING);
 +                }
 +
 +                for (int i = 0; i < colNames.size(); i++) {
 +                    SqlIdentifier colName = colNames.get(i);
 +
 +                    assert colName.isSimple();
 +
 +                    RelDataType type = fields.get(i).getType();
 +
 +                    cols.add(new ColumnDefinition(colName.getSimple(), type, null));
 +                }
 +            }
 +            else {
 +                // Derive column names and column types from the query.
 +                for (RelDataTypeField field : fields)
 +                    cols.add(new ColumnDefinition(field.getName(), field.getType(), null));
 +            }
 +
 +            createTblCmd.columns(cols);
 +        }
 +
 +        if (createTblCmd.columns() == null) {
 +            throw new IgniteSQLException("Column list or query should be specified for CREATE TABLE command",
 +                IgniteQueryErrorCode.PARSING);
 +        }
 +
 +        return createTblCmd;
 +    }
 +
 +    /**
 +     * Converts a given DropTable AST to a DropTable command.
 +     *
 +     * @param dropTblNode Root node of the given AST.
 +     * @param ctx Planning context.
 +     */
 +    private DropTableCommand convertDropTable(SqlDropTable dropTblNode, PlanningContext ctx) {
 +        DropTableCommand dropTblCmd = new DropTableCommand();
 +
 +        dropTblCmd.schemaName(deriveSchemaName(dropTblNode.name, ctx));
 +        dropTblCmd.tableName(deriveObjectName(dropTblNode.name, ctx, "tableName"));
 +        dropTblCmd.ifExists(dropTblNode.ifExists);
 +
 +        return dropTblCmd;
 +    }
 +
 +    /**
 +     * Converts a given IgniteSqlAlterTableAddColumn AST to a AlterTableAddCommand.
 +     *
 +     * @param alterTblNode Root node of the given AST.
 +     * @param ctx Planning context.
 +     */
 +    private AlterTableAddCommand convertAlterTableAdd(IgniteSqlAlterTableAddColumn alterTblNode, PlanningContext ctx) {
 +        AlterTableAddCommand alterTblCmd = new AlterTableAddCommand();
 +
 +        alterTblCmd.schemaName(deriveSchemaName(alterTblNode.name(), ctx));
 +        alterTblCmd.tableName(deriveObjectName(alterTblNode.name(), ctx, "table name"));
 +        alterTblCmd.ifTableExists(alterTblNode.ifExists());
 +        alterTblCmd.ifColumnNotExists(alterTblNode.ifNotExistsColumn());
 +
 +        List<ColumnDefinition> cols = new ArrayList<>(alterTblNode.columns().size());
 +
 +        for (SqlNode colNode : alterTblNode.columns()) {
 +            assert colNode instanceof SqlColumnDeclaration : colNode.getClass();
 +
 +            SqlColumnDeclaration col = (SqlColumnDeclaration)colNode;
 +
 +            assert col.name.isSimple();
 +
 +            String name = col.name.getSimple();
 +            RelDataType type = ctx.planner().convert(col.dataType);
 +
 +            assert col.expression == null : "Unexpected column default value" + col.expression;
 +
 +            cols.add(new ColumnDefinition(name, type, null));
 +        }
 +
 +        alterTblCmd.columns(cols);
 +
 +        return alterTblCmd;
 +    }
 +
 +    /**
 +     * Converts a given IgniteSqlAlterTableDropColumn AST to a AlterTableDropCommand.
 +     *
 +     * @param alterTblNode Root node of the given AST.
 +     * @param ctx Planning context.
 +     */
 +    private AlterTableDropCommand convertAlterTableDrop(IgniteSqlAlterTableDropColumn alterTblNode, PlanningContext ctx) {
 +        AlterTableDropCommand alterTblCmd = new AlterTableDropCommand();
 +
 +        alterTblCmd.schemaName(deriveSchemaName(alterTblNode.name(), ctx));
 +        alterTblCmd.tableName(deriveObjectName(alterTblNode.name(), ctx, "table name"));
 +        alterTblCmd.ifTableExists(alterTblNode.ifExists());
 +        alterTblCmd.ifColumnExists(alterTblNode.ifExistsColumn());
 +
 +        List<String> cols = new ArrayList<>(alterTblNode.columns().size());
 +        alterTblNode.columns().forEach(c -> cols.add(((SqlIdentifier)c).getSimple()));
 +
 +        alterTblCmd.columns(cols);
 +
 +        return alterTblCmd;
 +    }
 +
 +    /**
 +     * Short cut for validating that option value is a simple identifier.
 +     *
 +     * @param opt An option to validate.
 +     * @param ctx Planning context.
 +     * @throws IgniteSQLException In case the validation was failed.
 +     */
 +    private String paramIsSqlIdentifierValidator(IgniteSqlCreateTableOption opt, PlanningContext ctx) {
 +        if (!(opt.value() instanceof SqlIdentifier) || !((SqlIdentifier)opt.value()).isSimple())
 +            throwOptionParsingException(opt, "a simple identifier", ctx.query());
 +
 +        return ((SqlIdentifier)opt.value()).getSimple();
 +    }
 +
 +    /**
 +     * Creates a validator for an option which value should be value of given enumeration.
 +     *
 +     * @param clz Enumeration class to create validator for.
 +     */
 +    private static <T extends Enum<T>> BiFunction<IgniteSqlCreateTableOption, PlanningContext, T> validatorForEnumValue(
 +        Class<T> clz
 +    ) {
 +        return (opt, ctx) -> {
 +            T val = null;
 +
 +            if (opt.value() instanceof SqlIdentifier) {
 +                val = Arrays.stream(clz.getEnumConstants())
 +                    .filter(m -> m.name().equalsIgnoreCase(opt.value().toString()))
 +                    .findFirst()
 +                    .orElse(null);
 +            }
 +
 +            if (val == null)
 +                throwOptionParsingException(opt, "values are "
 +                    + Arrays.toString(clz.getEnumConstants()), ctx.query());
 +
 +            return val;
 +        };
 +    }
 +
 +    /**
 +     * Throws exception with message relates to validation of create table option.
 +     *
 +     * @param opt An option which validation was failed.
 +     * @param exp A string representing expected values.
 +     * @param qry A query the validation was failed for.
 +     */
 +    private static void throwOptionParsingException(IgniteSqlCreateTableOption opt, String exp, String qry) {
 +        throw new IgniteSQLException("Unexpected value for param " + opt.key() + " [" +
 +            "expected " + exp + ", but was " + opt.value() + "; " +
 +            "querySql=\"" + qry + "\"]", IgniteQueryErrorCode.PARSING);
 +    }
 +
 +    /** */
 +    private static class TableOptionProcessor<T> {
 +        /** */
 +        private final IgniteSqlCreateTableOptionEnum key;
 +
 +        /** */
 +        private final BiFunction<IgniteSqlCreateTableOption, PlanningContext, T> validator;
 +
 +        /** */
 +        private final BiConsumer<CreateTableCommand, T> valSetter;
 +
 +        /**
 +         * @param key Option key this processor is supopsed to handle.
 +         * @param validator Validator that derives a value from a {@link SqlNode},
 +         *                 validates it and then returns if validation passed,
 +         *                 throws an exeption otherwise.
 +         * @param valSetter Setter sets the value recived from the validator
 +         *                 to the given {@link CreateTableCommand}.
 +         */
 +        private TableOptionProcessor(
 +            IgniteSqlCreateTableOptionEnum key,
 +            BiFunction<IgniteSqlCreateTableOption, PlanningContext, T> validator,
 +            BiConsumer<CreateTableCommand, T> valSetter
 +        ) {
 +            this.key = key;
 +            this.validator = validator;
 +            this.valSetter = valSetter;
 +        }
 +
 +        /**
 +         * Processes the given option, validates it's value and then sets the appropriate
 +         * field in a given command, throws an exception if the validation failed.
 +         *
 +         * @param opt Option to validate.
 +         * @param ctx Planning context.
 +         * @param cmd Command instance to set a validation result.
 +         */
 +        private void process(IgniteSqlCreateTableOption opt, PlanningContext ctx, CreateTableCommand cmd) {
 +            assert key == null || key == opt.key() : "Unexpected create table option [expected=" + key + ", actual=" + opt.key() + "]";
 +
 +            valSetter.accept(cmd, validator.apply(opt, ctx));
 +        }
 +
 +        /**
 +         * @return Key this processor is supposed to handle.
 +         */
 +        private IgniteSqlCreateTableOptionEnum key() {
 +            return key;
 +        }
 +    }
 +}
diff --cc modules/calcite/src/test/java/org/apache/ignite/internal/processors/query/calcite/CancelTest.java
index 899747d,0000000..7727c53
mode 100644,000000..100644
--- a/modules/calcite/src/test/java/org/apache/ignite/internal/processors/query/calcite/CancelTest.java
+++ b/modules/calcite/src/test/java/org/apache/ignite/internal/processors/query/calcite/CancelTest.java
@@@ -1,258 -1,0 +1,258 @@@
 +/*
 + * Licensed to the Apache Software Foundation (ASF) under one or more
 + * contributor license agreements.  See the NOTICE file distributed with
 + * this work for additional information regarding copyright ownership.
 + * The ASF licenses this file to You under the Apache License, Version 2.0
 + * (the "License"); you may not use this file except in compliance with
 + * the License.  You may obtain a copy of the License at
 + *
 + *      http://www.apache.org/licenses/LICENSE-2.0
 + *
 + * Unless required by applicable law or agreed to in writing, software
 + * distributed under the License is distributed on an "AS IS" BASIS,
 + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 + * See the License for the specific language governing permissions and
 + * limitations under the License.
 + */
 +
 +package org.apache.ignite.internal.processors.query.calcite;
 +
 +import java.util.Iterator;
 +import java.util.List;
 +import java.util.NoSuchElementException;
 +import org.apache.ignite.IgniteCache;
 +import org.apache.ignite.cache.CacheMode;
 +import org.apache.ignite.cache.QueryEntity;
 +import org.apache.ignite.cache.affinity.rendezvous.RendezvousAffinityFunction;
 +import org.apache.ignite.cache.query.FieldsQueryCursor;
 +import org.apache.ignite.cache.query.QueryCursor;
 +import org.apache.ignite.configuration.CacheConfiguration;
 +import org.apache.ignite.configuration.IgniteConfiguration;
 +import org.apache.ignite.internal.IgniteInterruptedCheckedException;
 +import org.apache.ignite.internal.cluster.ClusterTopologyCheckedException;
 +import org.apache.ignite.internal.processors.query.IgniteSQLException;
 +import org.apache.ignite.internal.processors.query.QueryEngine;
 +import org.apache.ignite.internal.processors.query.calcite.metadata.RemoteException;
 +import org.apache.ignite.internal.processors.query.calcite.util.Commons;
 +import org.apache.ignite.internal.util.typedef.X;
 +import org.apache.ignite.testframework.GridTestUtils;
 +import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest;
 +import org.junit.Assert;
 +import org.junit.Test;
 +
 +import static java.util.Collections.singletonList;
 +import static org.apache.ignite.cache.query.QueryCancelledException.ERR_MSG;
 +import static org.apache.ignite.internal.processors.query.calcite.QueryChecker.awaitReservationsRelease;
 +
 +/**
 + * Cancel query test.
 + */
 +public class CancelTest extends GridCommonAbstractTest {
 +    /** {@inheritDoc} */
 +    @Override protected void beforeTest() throws Exception {
 +        startGrids(2);
 +
 +        IgniteCache<Integer, String> c = grid(0).cache("TEST");
 +
 +        fillCache(c, 5000);
 +    }
 +
 +    /** {@inheritDoc} */
 +    @Override protected void afterTest() throws Exception {
 +        stopAllGrids();
 +
 +        super.afterTest();
 +    }
 +
 +    /** {@inheritDoc} */
 +    @Override protected IgniteConfiguration getConfiguration(String igniteInstanceName) throws Exception {
 +        QueryEntity ePart = new QueryEntity()
 +            .setTableName("TEST")
 +            .setKeyType(Integer.class.getName())
 +            .setValueType(String.class.getName())
 +            .setKeyFieldName("id")
 +            .setValueFieldName("val")
 +            .addQueryField("id", Integer.class.getName(), null)
 +            .addQueryField("val", String.class.getName(), null);;
 +
 +        return super.getConfiguration(igniteInstanceName)
 +            .setCacheConfiguration(
 +                new CacheConfiguration<>(ePart.getTableName())
 +                    .setAffinity(new RendezvousAffinityFunction(false, 8))
 +                    .setCacheMode(CacheMode.PARTITIONED)
 +                    .setQueryEntities(singletonList(ePart))
 +                    .setSqlSchema("PUBLIC"));
 +    }
 +
 +    /**
 +     *
 +     */
 +    @Test
 +    public void testCancel() throws Exception {
 +        QueryEngine engine = Commons.lookupComponent(grid(0).context(), QueryEngine.class);
 +
 +        List<FieldsQueryCursor<List<?>>> cursors =
 +            engine.query(null, "PUBLIC",
 +                "SELECT * FROM TEST",
 +                X.EMPTY_OBJECT_ARRAY);
 +
 +        Iterator<List<?>> it = cursors.get(0).iterator();
 +
 +        it.next();
 +
 +        cursors.forEach(QueryCursor::close);
 +
 +        GridTestUtils.assertThrows(log, () -> {
 +                it.next();
 +
 +                return null;
 +            },
 +            IgniteSQLException.class, ERR_MSG
 +        );
 +
 +        awaitReservationsRelease("TEST");
 +    }
 +
 +    /**
 +     *
 +     */
 +    @Test
 +    public void testNotOriginatorNodeStop() throws Exception {
 +        QueryEngine engine = Commons.lookupComponent(grid(0).context(), QueryEngine.class);
 +
 +        List<FieldsQueryCursor<List<?>>> cursors =
 +            engine.query(null, "PUBLIC",
 +                "SELECT * FROM TEST",
 +                X.EMPTY_OBJECT_ARRAY);
 +
 +        Iterator<List<?>> it = cursors.get(0).iterator();
 +
 +        it.next();
 +
 +        stopGrid(1);
 +
 +        Throwable ex = GridTestUtils.assertThrows(log, () -> {
-                 while (it.hasNext())
-                     it.next();
++            while (it.hasNext())
++                it.next();
 +
-                 return null;
-             }, IgniteSQLException.class, null);
++            return null;
++        }, IgniteSQLException.class, null);
 +
 +        // Sometimes remote node during stopping can send error to originator node and this error processed before
 +        // node left event, in this case exception stack will looks like:
 +        // IgniteSQLException -> RemoteException -> IgniteInterruptedCheckedException
 +        if (!X.hasCause(ex, "node left", ClusterTopologyCheckedException.class) && !(X.hasCause(ex,
 +            RemoteException.class) && X.hasCause(ex, IgniteInterruptedCheckedException.class))) {
 +            log.error("Unexpected exception", ex);
 +
 +            fail("Unexpected exception: " + ex);
 +        }
 +
 +        Assert.assertTrue(GridTestUtils.waitForCondition(
 +            () -> engine.runningQueries().isEmpty(), 10_000));
 +
 +        awaitReservationsRelease(grid(0), "TEST");
 +    }
 +
 +    /**
 +     *
 +     */
 +    @Test
 +    public void testOriginatorNodeStop() throws Exception {
 +        QueryEngine engine = Commons.lookupComponent(grid(0).context(), QueryEngine.class);
 +
 +        List<FieldsQueryCursor<List<?>>> cursors =
 +            engine.query(null, "PUBLIC",
 +                "SELECT * FROM TEST",
 +                X.EMPTY_OBJECT_ARRAY);
 +
 +        Iterator<List<?>> it = cursors.get(0).iterator();
 +
 +        it.next();
 +
 +        stopGrid(0);
 +
 +        QueryEngine engine1 = Commons.lookupComponent(grid(1).context(), QueryEngine.class);
 +
 +        Assert.assertTrue(GridTestUtils.waitForCondition(
 +            () -> engine1.runningQueries().isEmpty(), 10_000));
 +
 +        awaitReservationsRelease(grid(1), "TEST");
 +    }
 +
 +    /**
 +     *
 +     */
 +    @Test
 +    public void testReadToEnd() throws Exception {
 +        QueryEngine engine = Commons.lookupComponent(grid(0).context(), QueryEngine.class);
 +
 +        List<FieldsQueryCursor<List<?>>> cursors =
 +            engine.query(null, "PUBLIC",
 +                "SELECT * FROM TEST WHERE ID < 1",
 +                X.EMPTY_OBJECT_ARRAY);
 +
 +        Iterator<List<?>> it = cursors.get(0).iterator();
 +
 +        it.next();
 +
 +        GridTestUtils.assertThrows(log, () -> {
 +                it.next();
 +
 +                return null;
 +            },
 +            NoSuchElementException.class, null
 +        );
 +
 +        GridTestUtils.assertThrows(log, () -> {
 +                it.next();
 +
 +                return null;
 +            },
 +            NoSuchElementException.class, null
 +        );
 +
 +        // Checks that all partition are unreserved.
 +        awaitReservationsRelease("TEST");
 +    }
 +
 +    /**
 +     *
 +     */
 +    @Test
 +    public void testFullReadToEnd() throws Exception {
 +        QueryEngine engine = Commons.lookupComponent(grid(0).context(), QueryEngine.class);
 +
 +        List<FieldsQueryCursor<List<?>>> cursors =
 +            engine.query(null, "PUBLIC",
 +                "SELECT * FROM TEST WHERE ID < 1",
 +                X.EMPTY_OBJECT_ARRAY);
 +
 +        cursors.get(0).getAll();
 +
 +        // Checks that all partition are unreserved.
 +        awaitReservationsRelease("TEST");
 +    }
 +
 +    /**
 +     * @param c Cache.
 +     * @param rows Rows count.
 +     */
 +    private void fillCache(IgniteCache c, int rows) throws InterruptedException {
 +        c.clear();
 +
 +        for (int i = 0; i < rows; ++i)
 +            c.put(i, "val_" + i);
 +
 +        awaitPartitionMapExchange();
 +    }
 +
 +    /**
 +     *
 +     */
 +    private void startNewNode() throws Exception {
 +        startGrid(2);
 +
 +        awaitPartitionMapExchange();
 +    }
 +}
diff --cc modules/core/src/main/java/org/apache/ignite/internal/processors/pool/PoolProcessor.java
index 1f42d21,0aca91b..5d2f6a2
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/pool/PoolProcessor.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/pool/PoolProcessor.java
@@@ -67,10 -65,8 +65,9 @@@ import org.apache.ignite.plugin.extensi
  import org.apache.ignite.spi.systemview.view.StripedExecutorTaskView;
  import org.apache.ignite.thread.IgniteStripedThreadPoolExecutor;
  import org.apache.ignite.thread.IgniteThreadPoolExecutor;
 +import org.apache.ignite.thread.SameThreadExecutor;
  import org.jetbrains.annotations.Nullable;
  
- import static java.util.concurrent.TimeUnit.MILLISECONDS;
  import static org.apache.ignite.configuration.IgniteConfiguration.DFLT_THREAD_KEEP_ALIVE_TIME;
  import static org.apache.ignite.failure.FailureType.SYSTEM_WORKER_TERMINATION;
  import static org.apache.ignite.internal.processors.cache.persistence.snapshot.IgniteSnapshotManager.SNAPSHOT_RUNNER_THREAD_PREFIX;
diff --cc modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/stat/IgniteGlobalStatisticsManager.java
index 88a555b,0000000..d4071e8
mode 100644,000000..100644
--- a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/stat/IgniteGlobalStatisticsManager.java
+++ b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/stat/IgniteGlobalStatisticsManager.java
@@@ -1,1021 -1,0 +1,1021 @@@
 +/*
 + * Licensed to the Apache Software Foundation (ASF) under one or more
 + * contributor license agreements.  See the NOTICE file distributed with
 + * this work for additional information regarding copyright ownership.
 + * The ASF licenses this file to You under the Apache License, Version 2.0
 + * (the "License"); you may not use this file except in compliance with
 + * the License.  You may obtain a copy of the License at
 + *
 + *      http://www.apache.org/licenses/LICENSE-2.0
 + *
 + * Unless required by applicable law or agreed to in writing, software
 + * distributed under the License is distributed on an "AS IS" BASIS,
 + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 + * See the License for the specific language governing permissions and
 + * limitations under the License.
 + */
 +
 +package org.apache.ignite.internal.processors.query.stat;
 +
 +import java.util.ArrayList;
 +import java.util.Collection;
 +import java.util.Collections;
 +import java.util.List;
 +import java.util.Map;
 +import java.util.Set;
 +import java.util.UUID;
 +import java.util.concurrent.ConcurrentHashMap;
 +import java.util.concurrent.ConcurrentMap;
 +import java.util.function.Function;
 +import java.util.stream.Collectors;
 +
 +import org.apache.ignite.IgniteCheckedException;
 +import org.apache.ignite.IgniteLogger;
 +import org.apache.ignite.cluster.ClusterNode;
 +import org.apache.ignite.cluster.ClusterState;
 +import org.apache.ignite.events.DiscoveryEvent;
 +import org.apache.ignite.internal.GridTopic;
 +import org.apache.ignite.internal.events.DiscoveryCustomEvent;
 +import org.apache.ignite.internal.managers.communication.GridIoManager;
 +import org.apache.ignite.internal.managers.communication.GridIoPolicy;
 +import org.apache.ignite.internal.managers.communication.GridMessageListener;
 +import org.apache.ignite.internal.managers.discovery.DiscoveryCustomMessage;
 +import org.apache.ignite.internal.managers.discovery.GridDiscoveryManager;
 +import org.apache.ignite.internal.managers.systemview.GridSystemViewManager;
 +import org.apache.ignite.internal.managers.systemview.walker.StatisticsColumnGlobalDataViewWalker;
 +import org.apache.ignite.internal.managers.systemview.walker.StatisticsColumnLocalDataViewWalker;
 +import org.apache.ignite.internal.managers.systemview.walker.StatisticsColumnPartitionDataViewWalker;
 +import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion;
 +import org.apache.ignite.internal.processors.cache.DynamicCacheChangeBatch;
 +import org.apache.ignite.internal.processors.cache.GridCachePartitionExchangeManager;
 +import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPartitionsExchangeFuture;
 +import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.PartitionsExchangeAware;
 +import org.apache.ignite.internal.processors.cluster.GridClusterStateProcessor;
 +import org.apache.ignite.internal.processors.query.stat.config.StatisticsColumnConfiguration;
 +import org.apache.ignite.internal.processors.query.stat.config.StatisticsObjectConfiguration;
 +import org.apache.ignite.internal.processors.query.stat.messages.StatisticsKeyMessage;
 +import org.apache.ignite.internal.processors.query.stat.messages.StatisticsObjectData;
 +import org.apache.ignite.internal.processors.query.stat.messages.StatisticsRequest;
 +import org.apache.ignite.internal.processors.query.stat.messages.StatisticsResponse;
 +import org.apache.ignite.internal.processors.query.stat.view.StatisticsColumnConfigurationView;
 +import org.apache.ignite.internal.processors.query.stat.view.StatisticsColumnGlobalDataView;
 +import org.apache.ignite.internal.util.IgniteUtils;
 +import org.apache.ignite.internal.util.typedef.F;
 +import org.apache.ignite.thread.IgniteThreadPoolExecutor;
 +
 +/**
 + * Global statistics manager. Cache global statistics and collect it.
 + */
 +public class IgniteGlobalStatisticsManager implements GridMessageListener {
 +    /** */
 +    private static final String STAT_GLOBAL_VIEW_NAME = "statisticsGlobalData";
 +
 +    /** */
 +    private static final String STAT_GLOBAL_VIEW_DESCRIPTION = "Global statistics.";
 +
 +    /** Statistics manager. */
 +    private final IgniteStatisticsManagerImpl statMgr;
 +
 +    /** Pool to process statistics requests. */
 +    private final IgniteThreadPoolExecutor mgmtPool;
 +
 +    /** Discovery manager to get server node list to statistics master calculation. */
 +    private final GridDiscoveryManager discoMgr;
 +
 +    /** Cluster state processor. */
 +    private final GridClusterStateProcessor cluster;
 +
 +    /** Cache partition exchange manager. */
 +    private final GridCachePartitionExchangeManager<?, ?> exchange;
 +
 +    /** Helper to transform or generate statistics related messages. */
 +    private final IgniteStatisticsHelper helper;
 +
 +    /** Grid io manager to exchange global and local statistics. */
 +    private final GridIoManager ioMgr;
 +
 +    /** Cache for global statistics. */
 +    private final ConcurrentMap<StatisticsKey, CacheEntry<ObjectStatisticsImpl>> globalStatistics =
 +        new ConcurrentHashMap<>();
 +
 +    /** Incoming requests which should be served after local statistics collection finish. */
 +    private final ConcurrentMap<StatisticsKey, Collection<StatisticsAddressedRequest>> inLocalRequests =
 +        new ConcurrentHashMap<>();
 +
 +    /** Incoming requests which should be served after global statistics collection finish. */
 +    private final ConcurrentMap<StatisticsKey, Collection<StatisticsAddressedRequest>> inGloblaRequests =
 +        new ConcurrentHashMap<>();
 +
 +    /** Outcoming global collection requests. */
 +    private final ConcurrentMap<StatisticsKey, StatisticsGatheringContext> curCollections = new ConcurrentHashMap<>();
 +
 +    /** Outcoming global statistics requests to request id. */
 +    private final ConcurrentMap<StatisticsKey, UUID> outGlobalStatisticsRequests = new ConcurrentHashMap<>();
 +
 +    /** Logger. */
 +    private final IgniteLogger log;
 +
 +    /** Started flag. */
 +    private boolean started;
 +
 +    /** Exchange listener: clean inbound requests and restart outbount. */
 +    private final PartitionsExchangeAware exchAwareLsnr = new PartitionsExchangeAware() {
 +        @Override public void onDoneAfterTopologyUnlock(GridDhtPartitionsExchangeFuture fut) {
 +
 +            // Skip join/left client nodes.
 +            if (fut.exchangeType() != GridDhtPartitionsExchangeFuture.ExchangeType.ALL ||
 +                cluster.clusterState().lastState() != ClusterState.ACTIVE)
 +                return;
 +
 +            DiscoveryEvent evt = fut.firstEvent();
 +
 +            // Skip create/destroy caches.
 +            if (evt.type() == DiscoveryCustomEvent.EVT_DISCOVERY_CUSTOM_EVT) {
 +                DiscoveryCustomMessage msg = ((DiscoveryCustomEvent)evt).customMessage();
 +
 +                if (msg instanceof DynamicCacheChangeBatch)
 +                    return;
 +
 +                // Just clear all activities and update topology version.
 +                if (log.isDebugEnabled())
 +                    log.debug("Resetting all global statistics activities due to new topology " +
 +                        fut.topologyVersion());
 +
 +                inLocalRequests.clear();
 +                inGloblaRequests.clear();
 +
 +                Set<StatisticsKey> curColls = curCollections.keySet();
 +
 +                for (StatisticsKey key : curColls) {
 +                    curCollections.remove(key);
 +
 +                    mgmtPool.submit(() -> collectGlobalStatistics(key));
 +                }
 +
 +                Set<StatisticsKey> outReqs = outGlobalStatisticsRequests.keySet();
 +
 +                for (StatisticsKey key : outReqs) {
 +                    outGlobalStatisticsRequests.remove(key);
 +
 +                    mgmtPool.submit(() -> collectGlobalStatistics(key));
 +                }
 +            }
 +        }
 +    };
 +
 +    /**
 +     * Constructor.
 +     *
 +     * @param statMgr Statistics manager.
 +     * @param sysViewMgr System view manager.
 +     * @param mgmtPool Statistics management pool.
 +     * @param discoMgr Grid discovery manager.
 +     * @param cluster Cluster state processor.
 +     * @param exchange Partition exchange manager.
 +     * @param helper Statistics helper.
 +     * @param ioMgr Communication manager.
 +     * @param logSupplier Log supplier.
 +     */
 +    public IgniteGlobalStatisticsManager(
 +        IgniteStatisticsManagerImpl statMgr,
 +        GridSystemViewManager sysViewMgr,
 +        IgniteThreadPoolExecutor mgmtPool,
 +        GridDiscoveryManager discoMgr,
 +        GridClusterStateProcessor cluster,
 +        GridCachePartitionExchangeManager<?, ?> exchange,
 +        IgniteStatisticsHelper helper,
 +        GridIoManager ioMgr,
 +        Function<Class<?>, IgniteLogger> logSupplier
 +    ) {
 +        this.statMgr = statMgr;
 +        this.mgmtPool = mgmtPool;
 +        this.discoMgr = discoMgr;
 +        this.cluster = cluster;
 +        this.exchange = exchange;
 +        this.helper = helper;
 +        this.ioMgr = ioMgr;
 +        log = logSupplier.apply(IgniteGlobalStatisticsManager.class);
 +
 +        statMgr.subscribeToLocalStatistics(nls -> onLocalStatisticsAggregated(nls.key(), nls.statistics(),
 +            nls.topologyVersion()));
 +        statMgr.subscribeToStatisticsConfig(this::onConfigChanged);
 +        ioMgr.addMessageListener(GridTopic.TOPIC_STATISTICS, this);
 +
 +        sysViewMgr.registerFiltrableView(STAT_GLOBAL_VIEW_NAME, STAT_GLOBAL_VIEW_DESCRIPTION,
 +            new StatisticsColumnGlobalDataViewWalker(), this::columnGlobalStatisticsViewSupplier, Function.identity());
 +    }
 +
 +    /**
 +     * Statistics column global data view filterable supplier.
 +     *
 +     * @param filter Filter.
 +     * @return Iterable with statistics column global data views.
 +     */
 +    private Iterable<StatisticsColumnGlobalDataView> columnGlobalStatisticsViewSupplier(Map<String, Object> filter) {
 +        String type = (String)filter.get(StatisticsColumnPartitionDataViewWalker.TYPE_FILTER);
 +        if (type != null && !StatisticsColumnConfigurationView.TABLE_TYPE.equalsIgnoreCase(type))
 +            return Collections.emptyList();
 +
 +        String schema = (String)filter.get(StatisticsColumnLocalDataViewWalker.SCHEMA_FILTER);
 +        String name = (String)filter.get(StatisticsColumnLocalDataViewWalker.NAME_FILTER);
 +        String column = (String)filter.get(StatisticsColumnPartitionDataViewWalker.COLUMN_FILTER);
 +
 +        Map<StatisticsKey, ObjectStatisticsImpl> globalStatsMap;
 +        if (!F.isEmpty(schema) && !F.isEmpty(name)) {
 +            StatisticsKey key = new StatisticsKey(schema, name);
 +
 +            CacheEntry<ObjectStatisticsImpl> objLocStat = globalStatistics.get(key);
 +
 +            if (objLocStat == null || objLocStat.obj == null)
 +                return Collections.emptyList();
 +
 +            globalStatsMap = Collections.singletonMap(key, objLocStat.object());
 +        }
 +        else
 +            globalStatsMap = globalStatistics.entrySet().stream()
 +                .filter(e -> e.getValue().object() != null && (F.isEmpty(schema) || schema.equals(e.getKey().schema())))
 +                .collect(Collectors.toMap(Map.Entry::getKey, e -> e.getValue().object()));
 +
 +        List<StatisticsColumnGlobalDataView> res = new ArrayList<>();
 +
 +        for (Map.Entry<StatisticsKey, ObjectStatisticsImpl> localStatsEntry : globalStatsMap.entrySet()) {
 +            StatisticsKey key = localStatsEntry.getKey();
 +            ObjectStatisticsImpl stat = localStatsEntry.getValue();
 +
 +            if (column == null) {
 +                for (Map.Entry<String, ColumnStatistics> colStat : localStatsEntry.getValue().columnsStatistics()
 +                    .entrySet()) {
 +                    StatisticsColumnGlobalDataView colStatView = new StatisticsColumnGlobalDataView(key,
 +                        colStat.getKey(), stat);
 +
 +                    res.add(colStatView);
 +                }
 +            }
 +            else {
 +                ColumnStatistics colStat = localStatsEntry.getValue().columnStatistics(column);
 +
 +                if (colStat != null) {
 +                    StatisticsColumnGlobalDataView colStatView = new StatisticsColumnGlobalDataView(key, column, stat);
 +
 +                    res.add(colStatView);
 +                }
 +            }
 +        }
 +
 +        return res;
 +    }
 +
 +    /**
 +     * Start operations.
 +     * Shouldn't be called twice.
 +     */
 +    public synchronized void start() {
 +        if (started) {
 +            if (log.isDebugEnabled())
 +                log.debug("IgniteGlobalStatisticsManager already started.");
 +
 +            return;
 +        }
 +
 +        if (log.isDebugEnabled())
 +            log.debug("Global statistics manager starting...");
 +
 +        globalStatistics.clear();
 +        exchange.registerExchangeAwareComponent(exchAwareLsnr);
 +
 +        started = true;
 +
 +        if (log.isDebugEnabled())
 +            log.debug("Global statistics manager started.");
 +    }
 +
 +    /**
 +     * Stop operations.
 +     * Shouldn't be called twice.
 +     */
 +    public synchronized void stop() {
 +        if (!started) {
 +            if (log.isDebugEnabled())
 +                log.debug("IgniteGlobalStatisticsManager already stopped.");
 +
 +            return;
 +        }
 +
 +        if (log.isDebugEnabled())
 +            log.debug("Global statistics manager stopping...");
 +
 +        globalStatistics.clear();
 +
 +        inGloblaRequests.clear();
 +        inLocalRequests.clear();
 +        outGlobalStatisticsRequests.clear();
 +        curCollections.clear();
 +
 +        exchange.unregisterExchangeAwareComponent(exchAwareLsnr);
 +
 +        started = false;
 +
 +        if (log.isDebugEnabled())
 +            log.debug("Global statistics manager stopped.");
 +    }
 +
 +    /**
 +     * Get global statistics for the given key. If there is no cached statistics, but there is cache record with
 +     * empty object - no additional collection will be started.
 +     *
 +     * @param key Statistics key.
 +     * @return Global object statistics or {@code null} if there is no global statistics available.
 +     */
 +    public ObjectStatisticsImpl getGlobalStatistics(StatisticsKey key) {
 +        CacheEntry<ObjectStatisticsImpl> res = globalStatistics.computeIfAbsent(key, k -> {
 +            if (log.isDebugEnabled())
 +                log.debug("Scheduling global statistics collection by key " + key);
 +
 +            mgmtPool.submit(() -> collectGlobalStatistics(key));
 +
 +            return new CacheEntry<>(null);
 +        });
 +
 +        return res.object();
 +    }
 +
 +    /**
 +     * Either send local or global statistics request to get global statistics.
 +     *
 +     * @param key Statistics key to get global statistics by.
 +     */
 +    private void collectGlobalStatistics(StatisticsKey key) {
 +        try {
 +            StatisticsObjectConfiguration statCfg = statMgr.statisticConfiguration().config(key);
 +
 +            if (statCfg != null && !statCfg.columns().isEmpty()) {
 +                UUID statMaster = getStatisticsMasterNode(key);
 +
 +                if (discoMgr.localNode().id().equals(statMaster))
 +                    gatherGlobalStatistics(statCfg);
 +                else {
 +                    StatisticsKeyMessage keyMsg = new StatisticsKeyMessage(key.schema(), key.obj(),
 +                        Collections.emptyList());
 +
 +                    Map<String, Long> versions = statCfg.columns().entrySet().stream()
 +                        .collect(Collectors.toMap(Map.Entry::getKey, e -> e.getValue().version()));
 +
 +                    StatisticsRequest globalReq = new StatisticsRequest(UUID.randomUUID(), keyMsg,
 +                        StatisticsType.GLOBAL, null, versions);
 +
 +                    outGlobalStatisticsRequests.put(key, globalReq.reqId());
 +
 +                    if (log.isDebugEnabled())
 +                        log.debug("Send global statistics request by configuration " + statCfg);
 +
 +                    send(statMaster, globalReq);
 +
 +                }
 +            }
 +            else {
 +                if (log.isDebugEnabled())
 +                    log.debug("Unable to start global statistics collection due to lack of configuration by key "
 +                        + key);
 +            }
 +
 +        }
 +        catch (IgniteCheckedException e) {
 +            if (log.isInfoEnabled())
 +                log.info("Unable to get statistics configuration due to " + e.getMessage());
 +        }
 +    }
 +
 +    /**
 +     * Collect global statistics on master node.
 +     *
 +     * @param statCfg Statistics config to gather global statistics by.
 +     */
 +    private void gatherGlobalStatistics(StatisticsObjectConfiguration statCfg) throws IgniteCheckedException {
 +        if (log.isDebugEnabled())
 +            log.debug("Start global statistics collection by configuration " + statCfg);
 +
 +        StatisticsTarget target = new StatisticsTarget(statCfg.key());
 +
 +        List<StatisticsAddressedRequest> locRequests = helper.generateGatheringRequests(target, statCfg);
 +        UUID reqId = locRequests.get(0).req().reqId();
 +
 +        StatisticsGatheringContext gatCtx = new StatisticsGatheringContext(locRequests.size(), reqId, statCfg);
 +
 +        curCollections.put(statCfg.key(), gatCtx);
 +
 +        for (StatisticsAddressedRequest addReq : locRequests) {
 +            if (log.isDebugEnabled())
 +                log.debug("Sending local request " + addReq.req().reqId() + " to node " + addReq.nodeId());
 +
 +            send(addReq.nodeId(), addReq.req());
 +        }
 +    }
 +
 +    /** {@inheritDoc} */
 +    @Override public void onMessage(UUID nodeId, Object msg, byte plc) {
 +        mgmtPool.submit(() -> {
 +            try {
 +                if (msg instanceof StatisticsRequest) {
 +                    StatisticsRequest req = (StatisticsRequest)msg;
 +                    switch (req.type()) {
 +                        case LOCAL:
 +                            processLocalRequest(nodeId, req);
 +
 +                            break;
 +
 +                        case GLOBAL:
 +                            processGlobalRequest(nodeId, req);
 +
 +                            break;
 +
 +                        default:
 +                            log.warning("Unexpected type " + req.type() + " in statistics request message " + req);
 +                    }
 +                }
 +                else if (msg instanceof StatisticsResponse) {
 +                    StatisticsResponse resp = (StatisticsResponse)msg;
 +
 +                    switch (resp.data().type()) {
 +                        case LOCAL:
 +                            processLocalResponse(nodeId, resp);
 +
 +                            break;
 +
 +                        case GLOBAL:
 +                            processGlobalResponse(nodeId, resp);
 +
 +                            break;
 +
 +                        default:
 +                            log.warning("Unexpected type " + resp.data().type() +
 +                                " in statistics reposonse message " + resp);
 +                    }
 +
 +                }
 +                else
 +                    log.warning("Unknown msg " + msg + " in statistics topic " + GridTopic.TOPIC_STATISTICS +
 +                        " from node " + nodeId);
 +            }
 +            catch (Throwable e) {
 +                log.warning("Unable to process statistics message: " + e.getMessage(), e);
 +            }
 +        });
 +    }
 +
 +    /**
 +     * Process request for local statistics.
 +     * 1) If there are local statistics for the given key - send response.
 +     * 2) If there is no such statistics - add request to incoming queue.
 +     *
 +     * @param nodeId Sender node id.
 +     * @param req Request to process.
 +     * @throws IgniteCheckedException
 +     */
 +    private void processLocalRequest(UUID nodeId, StatisticsRequest req) throws IgniteCheckedException {
 +        if (log.isDebugEnabled())
 +            log.debug("Got local statistics request from node " + nodeId + " : " + req);
 +
 +        StatisticsKey key = new StatisticsKey(req.key().schema(), req.key().obj());
 +
 +        ObjectStatisticsImpl objectStatistics = statMgr.getLocalStatistics(key, req.topVer());
 +
 +        if (StatisticsUtils.compareVersions(objectStatistics, req.versions()) == 0)
 +            sendResponse(nodeId, req.reqId(), key, StatisticsType.LOCAL, objectStatistics);
 +        else {
 +            addToRequests(inLocalRequests, key, new StatisticsAddressedRequest(req, nodeId));
 +
 +            objectStatistics = statMgr.getLocalStatistics(key, req.topVer());
 +
 +            if (StatisticsUtils.compareVersions(objectStatistics, req.versions()) == 0) {
 +                StatisticsAddressedRequest removed = removeFromRequests(inLocalRequests, key, req.reqId());
 +
 +                if (removed != null)
 +                    sendResponse(nodeId, req.reqId(), key, StatisticsType.LOCAL, objectStatistics);
 +            }
 +        }
 +    }
 +
 +    /**
 +     * Test if statistics configuration is fit to all required versions.
 +     * @param cfg Statistics configuration to check.
 +     * @param versions Map of column name to required version.
 +     * @return {@code true} if it is, {@code false} otherwise.
 +     */
 +    private boolean checkStatisticsCfg(StatisticsObjectConfiguration cfg, Map<String, Long> versions) {
 +        if (cfg == null)
 +            return false;
 +
 +        for (Map.Entry<String, Long> version : versions.entrySet()) {
 +            StatisticsColumnConfiguration colCfg = cfg.columns().get(version.getKey());
 +
 +            if (colCfg == null || colCfg.version() < version.getValue())
 +                return false;
 +        }
 +
 +        return true;
 +    }
 +
 +    /**
 +     * Process incoming request for global statistics. Either response (if it exists), or collect and response
 +     * (if current node is master node for the given key) or ignore (if current node is no more master node for
 +     * the given key.
 +     *
 +     * @param nodeId Sender node id.
 +     * @param req Request.
 +     */
 +    private void processGlobalRequest(UUID nodeId, StatisticsRequest req) throws IgniteCheckedException {
 +        if (log.isDebugEnabled())
 +            log.debug("Got global statistics request from node " + nodeId + " : " + req);
 +
 +        StatisticsKey key = new StatisticsKey(req.key().schema(), req.key().obj());
 +
 +        ObjectStatisticsImpl objStatistics = getGlobalStatistics(key, req.versions());
 +
 +        if (objStatistics == null) {
 +            if (discoMgr.localNode().id().equals(getStatisticsMasterNode(key))) {
 +
 +                addToRequests(inGloblaRequests, key, new StatisticsAddressedRequest(req, nodeId));
 +                globalStatistics.computeIfAbsent(key, k -> new CacheEntry<>(null));
 +
 +                if (!hasCurrentCollection(key, req.versions())) {
 +                    StatisticsObjectConfiguration objCfg = statMgr.statisticConfiguration().config(key);
 +
 +                    if (StatisticsUtils.compareVersions(objCfg, req.versions()) >= 0)
 +                        gatherGlobalStatistics(objCfg);
 +                    else {
 +                        if (log.isDebugEnabled())
 +                            log.debug("Wait for statistics configuration to process global statistics request " +
 +                                req.reqId());
 +                    }
 +                }
 +            }
 +
 +            objStatistics = getGlobalStatistics(key, req.versions());
 +
 +            if (objStatistics != null) {
 +                StatisticsAddressedRequest removed = removeFromRequests(inGloblaRequests, key, req.reqId());
 +
 +                if (removed != null)
 +                    sendResponse(nodeId, req.reqId(), key, StatisticsType.GLOBAL, objStatistics);
 +            }
 +        }
 +        else
 +            sendResponse(nodeId, req.reqId(), key, StatisticsType.GLOBAL, objStatistics);
 +    }
 +
 +    /**
 +     * Check if there are already started current collection with specified parameters.
 +     *
 +     * @param key Statistics key.
 +     * @param versions Reuqired versions.
 +     * @return {@code true} if there are already current collection with specifie parameters, {@code false} - otherwise.
 +     */
 +    private boolean hasCurrentCollection(StatisticsKey key, Map<String, Long> versions) {
 +        StatisticsGatheringContext ctx = curCollections.get(key);
 +
 +        if (ctx == null)
 +            return false;
 +
 +        return StatisticsUtils.compareVersions(ctx.configuration(), versions) == 0;
 +    }
 +
 +    /**
 +     * Get apptopriate global statistics from cache.
 +     *
 +     * @param key Statistics key.
 +     * @param versions Required versions.
 +     * @return Global statistics or {@code null} if there are no such global statistics.
 +     */
 +    private ObjectStatisticsImpl getGlobalStatistics(StatisticsKey key, Map<String, Long> versions) {
 +        CacheEntry<ObjectStatisticsImpl> objStatEntry = globalStatistics.get(key);
 +
 +        if (objStatEntry == null || StatisticsUtils.compareVersions(objStatEntry.object(), versions) != 0)
 +            return null;
 +
 +        return objStatEntry.object();
 +    }
 +
 +    /**
 +     * Build statistics response and send it to specified node.
 +     *
 +     * @param nodeId Target node id.
 +     * @param reqId Request id.
 +     * @param key Statistics key.
 +     * @param type Statistics type.
 +     * @param data Statitsics data.
 +     */
 +    private void sendResponse(
 +        UUID nodeId,
 +        UUID reqId,
 +        StatisticsKey key,
 +        StatisticsType type,
 +        ObjectStatisticsImpl data
 +    ) throws IgniteCheckedException {
 +        StatisticsKeyMessage keyMsg = new StatisticsKeyMessage(key.schema(), key.obj(), null);
 +        StatisticsObjectData dataMsg = StatisticsUtils.toObjectData(keyMsg, type, data);
 +
 +        send(nodeId, new StatisticsResponse(reqId, dataMsg));
 +    }
 +
 +    /**
 +     * Add to addressed requests map.
 +     *
 +     * @param map Map to add into.
 +     * @param key Request statistics key.
 +     * @param req Request to add.
 +     */
 +    private void addToRequests(
 +        ConcurrentMap<StatisticsKey, Collection<StatisticsAddressedRequest>> map,
 +        StatisticsKey key,
 +        StatisticsAddressedRequest req
 +    ) {
 +        map.compute(key, (k, v) -> {
 +            if (v == null)
 +                v = new ArrayList<>();
 +
 +            v.add(req);
 +
 +            return v;
 +        });
 +    }
 +
 +    /**
 +     * Check if specified map contains request with specified key and id, remove and return it.
 +     *
 +     * @param key Request statistics key.
 +     * @param reqId Request id.
 +     * @return Removed request.
 +     */
 +    private StatisticsAddressedRequest removeFromRequests(
 +        ConcurrentMap<StatisticsKey, Collection<StatisticsAddressedRequest>> map,
 +        StatisticsKey key,
 +        UUID reqId
 +    ) {
 +        StatisticsAddressedRequest[] res = new StatisticsAddressedRequest[1];
 +
 +        map.compute(key, (k, v) -> {
 +            if (v != null)
 +                res[0] = v.stream().filter(e -> reqId.equals(e.req().reqId())).findAny().orElse(null);
 +
 +            if (res[0] != null)
 +                v = v.stream().filter(e -> !reqId.equals(e.req().reqId())).collect(Collectors.toList());
 +
 +            return v;
 +        });
 +
 +        return res[0];
 +    }
 +
 +    /**
 +     * Process statistics configuration changes:
 +     *
 +     * 1) Remove all outbound activity by specified key, inbound may be suspended due to lack of
 +     *  requested configuration.
 +     * 2) Remove all inbound activity by changed key if reqiest col cfg versions lower than configuration col cfg versions.
 +     * 3.1) If there are no live column's config - remove cached global statistics.
 +     * 3.2) If there are some live columns config and global statistics cache contains statistics for the given key -
 +     * start to collect it again.
 +     */
 +    public void onConfigChanged(StatisticsObjectConfiguration cfg) {
-        StatisticsKey key = cfg.key();
++        StatisticsKey key = cfg.key();
 +
-        curCollections.remove(key);
-        outGlobalStatisticsRequests.remove(key);
++        curCollections.remove(key);
++        outGlobalStatisticsRequests.remove(key);
 +
-        inLocalRequests.computeIfPresent(key, (k, v) -> {
-            // Current config newer than income request - request should be invalidated.
-            v.removeIf(req -> StatisticsUtils.compareVersions(cfg, req.req().versions()) > 0);
++        inLocalRequests.computeIfPresent(key, (k, v) -> {
++            // Current config newer than income request - request should be invalidated.
++            v.removeIf(req -> StatisticsUtils.compareVersions(cfg, req.req().versions()) > 0);
 +
-            return (v.isEmpty()) ? null : v;
-        });
++            return (v.isEmpty()) ? null : v;
++        });
 +
-        inGloblaRequests.computeIfPresent(key, (k, v) -> {
-            v.removeIf(req -> StatisticsUtils.compareVersions(cfg, req.req().versions()) > 0);
++        inGloblaRequests.computeIfPresent(key, (k, v) -> {
++            v.removeIf(req -> StatisticsUtils.compareVersions(cfg, req.req().versions()) > 0);
 +
-            return (v.isEmpty()) ? null : v;
-        });
++            return (v.isEmpty()) ? null : v;
++        });
 +
-        if (cfg.columns().isEmpty())
-            globalStatistics.remove(key);
-        else {
-            globalStatistics.computeIfPresent(key, (k, v) -> {
-                if (v != null) {
-                    if (log.isDebugEnabled())
-                        log.debug("Scheduling global statistics recollection by key " + key);
++        if (cfg.columns().isEmpty())
++            globalStatistics.remove(key);
++        else {
++            globalStatistics.computeIfPresent(key, (k, v) -> {
++                if (v != null) {
++                    if (log.isDebugEnabled())
++                        log.debug("Scheduling global statistics recollection by key " + key);
 +
-                    mgmtPool.submit(() -> collectGlobalStatistics(key));
-                }
-                return v;
-            });
-        }
++                    mgmtPool.submit(() -> collectGlobalStatistics(key));
++                }
++                return v;
++            });
++        }
 +    }
 +
 +    /**
 +     * Clear global object statistics.
 +     *
 +     * @param key Object key to clear global statistics by.
 +     * @param colNames Only statistics by specified columns will be cleared.
 +     */
 +    public void clearGlobalStatistics(StatisticsKey key, Set<String> colNames) {
 +        globalStatistics.computeIfPresent(key, (k, v) -> {
 +            ObjectStatisticsImpl globStatOld = v.object();
 +            ObjectStatisticsImpl globStatNew = (globStatOld == null) ? null : globStatOld.subtract(colNames);
 +
 +            return (globStatNew == null || globStatNew.columnsStatistics().isEmpty()) ? null :
 +                new CacheEntry<>(globStatNew);
 +        });
 +
 +        outGlobalStatisticsRequests.remove(key);
 +    }
 +
 +    /**
 +     * Process response with local statistics. Try to finish collecting operation and send pending requests.
 +     *
 +     * @param nodeId Sender node id.
 +     * @param resp Statistics response to process.
 +     * @throws IgniteCheckedException In case of error.
 +     */
 +    private void processLocalResponse(UUID nodeId, StatisticsResponse resp) throws IgniteCheckedException {
 +        StatisticsKeyMessage keyMsg = resp.data().key();
 +        StatisticsKey key = new StatisticsKey(keyMsg.schema(), resp.data().key().obj());
 +
 +        if (log.isDebugEnabled())
 +            log.debug("Got local statistics response " + resp.reqId() + " from node " + nodeId + " by key " + key);
 +
 +        StatisticsGatheringContext curCtx = curCollections.get(key);
 +
 +        if (curCtx != null) {
 +            if (!curCtx.reqId().equals(resp.reqId())) {
 +                if (log.isDebugEnabled())
 +                    log.debug("Got outdated local statistics response " + resp + " instead of " + curCtx.reqId());
 +
 +                return;
 +            }
 +
 +            ObjectStatisticsImpl data = StatisticsUtils.toObjectStatistics(null, resp.data());
 +
 +            if (curCtx.registerResponse(data)) {
 +                StatisticsObjectConfiguration cfg = statMgr.statisticConfiguration().config(key);
 +
 +                if (cfg != null) {
 +                    if (log.isDebugEnabled())
 +                        log.debug("Aggregating global statistics for key " + key + " by request " + curCtx.reqId());
 +
 +                    ObjectStatisticsImpl globalStat = helper.aggregateLocalStatistics(cfg, curCtx.collectedData());
 +
 +                    globalStatistics.put(key, new CacheEntry<>(globalStat));
 +
 +                    if (log.isDebugEnabled())
 +                        log.debug("Global statistics for key " + key + " collected.");
 +
 +                    Collection<StatisticsAddressedRequest> globalRequests = inGloblaRequests.remove(key);
 +
 +                    if (globalRequests != null) {
 +                        StatisticsObjectData globalStatData = StatisticsUtils.toObjectData(keyMsg,
 +                            StatisticsType.GLOBAL, globalStat);
 +
 +                        for (StatisticsAddressedRequest req : globalRequests) {
 +                            StatisticsResponse outResp = new StatisticsResponse(req.req().reqId(), globalStatData);
 +
 +                            send(req.nodeId(), outResp);
 +                        }
 +                    }
 +                }
 +                else {
 +                    if (log.isDebugEnabled())
 +                        log.debug("Dropping collected statistics due to lack of configuration for key " + key);
 +                }
 +
 +                curCollections.remove(key);
 +            }
 +        }
 +        else {
 +            if (log.isDebugEnabled())
 +                log.debug("Got outdated local statistics response " + resp);
 +        }
 +    }
 +
 +    /**
 +     * Process response of global statistics.
 +     *
 +     * @param resp Response.
 +     * @throws IgniteCheckedException In case of error.
 +     */
 +    private void processGlobalResponse(UUID nodeId, StatisticsResponse resp) throws IgniteCheckedException {
 +        StatisticsKeyMessage keyMsg = resp.data().key();
 +        StatisticsKey key = new StatisticsKey(keyMsg.schema(), keyMsg.obj());
 +
 +        if (log.isDebugEnabled())
 +            log.debug("Got global statistics response " + resp.reqId() + " from node " + nodeId + " by key " + key);
 +
 +        UUID reqId = outGlobalStatisticsRequests.get(key);
 +
 +        if (reqId != null) {
 +            if (!resp.reqId().equals(reqId)) {
 +                if (log.isDebugEnabled())
 +                    log.debug("Got outdated global statistics response " + resp + " instead of " + reqId);
 +
 +                return;
 +            }
 +
 +            ObjectStatisticsImpl data = StatisticsUtils.toObjectStatistics(null, resp.data());
 +
 +            globalStatistics.put(key, new CacheEntry(data));
 +            outGlobalStatisticsRequests.remove(key);
 +        }
 +        else {
 +            if (log.isDebugEnabled())
 +                log.debug("Got outdated global statistics response " + resp);
 +        }
 +    }
 +
 +    /**
 +     * Calculate id of statistics master node for the given key.
 +     *
 +     * @param key Statistics key to calculate master node for.
 +     * @return UUID of statistics master node.
 +     */
 +    private UUID getStatisticsMasterNode(StatisticsKey key) {
 +        UUID[] nodes = discoMgr.aliveServerNodes().stream().map(ClusterNode::id).sorted().toArray(UUID[]::new);
 +        int idx = IgniteUtils.hashToIndex(key.obj().hashCode(), nodes.length);
 +
 +        return nodes[idx];
 +    }
 +
 +    /**
 +     * After collecting local statistics - check if there are some pending request for it and send responces.
 +     *
 +     * @param key Statistics key on which local statistics was aggregated.
 +     * @param statistics Collected statistics by key.
 +     * @param topVer Topology version which aggregated statistics stands for.
 +     */
 +    public void onLocalStatisticsAggregated(
 +        StatisticsKey key,
 +        ObjectStatisticsImpl statistics,
 +        AffinityTopologyVersion topVer
 +    ) {
 +        List<StatisticsAddressedRequest> inReqs = new ArrayList<>();
 +
 +        inLocalRequests.computeIfPresent(key, (k, v) -> {
 +            List<StatisticsAddressedRequest> left = new ArrayList<>();
 +
 +            for (StatisticsAddressedRequest req : v) {
 +                if (topVer.equals(req.req().topVer()) &&
 +                    StatisticsUtils.compareVersions(statistics, req.req().versions()) == 0)
 +                    inReqs.add(req);
 +                else
 +                    left.add(req);
 +            }
 +
 +            return (left.isEmpty()) ? null : left;
 +        });
 +
 +        if (inReqs.isEmpty())
 +            return;
 +
 +        for (StatisticsAddressedRequest req : inReqs) {
 +            try {
 +                sendResponse(req.nodeId(), req.req().reqId(), key, StatisticsType.LOCAL, statistics);
 +            }
 +            catch (IgniteCheckedException e) {
 +                log.info("Unable to send local object statistics for key " + key + " due to " + e.getMessage());
 +            }
 +        }
 +    }
 +
 +    /**
 +     * Send statistics related message.
 +     *
 +     * @param nodeId Target node id.
 +     * @param msg Message to send.
 +     * @throws IgniteCheckedException In case of error.
 +     */
 +    private void send(UUID nodeId, StatisticsRequest msg) throws IgniteCheckedException {
 +        if (discoMgr.localNode().id().equals(nodeId)) {
 +            switch (msg.type()) {
 +                case LOCAL:
 +                    processLocalRequest(nodeId, msg);
 +
 +                    break;
 +
 +                default:
 +                    log.warning("Unexpected type " + msg.type() + " in statistics request message " + msg);
 +            }
 +        }
 +        else
 +            ioMgr.sendToGridTopic(nodeId, GridTopic.TOPIC_STATISTICS, msg, GridIoPolicy.MANAGEMENT_POOL);
 +    }
 +
 +    /**
 +     * Send statistics response or process it locally.
 +     *
 +     * @param nodeId Target node id. If equals to local node - corresponding method will be called directly.
 +     * @param msg Statistics response to send.
 +     * @throws IgniteCheckedException In case of error.
 +     */
 +    private void send(UUID nodeId, StatisticsResponse msg) throws IgniteCheckedException {
 +        if (discoMgr.localNode().id().equals(nodeId)) {
 +            switch (msg.data().type()) {
 +                case LOCAL:
 +                    processLocalResponse(nodeId, msg);
 +
 +                    break;
 +
 +                case GLOBAL:
 +                    processGlobalResponse(nodeId, msg);
 +
 +                    break;
 +
 +                default:
 +                    log.warning("Unexpected type " + msg.data().type() + " in statistics response message " + msg);
 +            }
 +        }
 +        else
 +            ioMgr.sendToGridTopic(nodeId, GridTopic.TOPIC_STATISTICS, msg, GridIoPolicy.MANAGEMENT_POOL);
 +    }
 +
 +    /** Cache entry. */
 +    private static class CacheEntry<T> {
 +        /** Cached object. */
 +        private final T obj;
 +
 +        /**
 +         * Constructor.
 +         *
 +         * @param obj Cached object.
 +         */
 +        public CacheEntry(T obj) {
 +            this.obj = obj;
 +        }
 +
 +        /**
 +         * @return Cached object.
 +         */
 +        public T object() {
 +            return obj;
 +        }
 +    }
 +
 +    /** Context of global statistics gathering. */
 +    private static class StatisticsGatheringContext {
 +        /** Number of remaining requests. */
 +        private int remainingResponses;
 +
 +        /** Requests id. */
 +        private final UUID reqId;
 +
 +        /** Local object statistics from responses. */
 +        private final Collection<ObjectStatisticsImpl> responses = new ArrayList<>();
 +
 +        /** Configuration, used to collect statistics. */
 +        private final StatisticsObjectConfiguration cfg;
 +
 +        /**
 +         * Constructor.
 +         *
 +         * @param responseCont Expectiong response count.
 +         * @param reqId Requests id.
 +         * @param cfg Configuration, used to collect statistics.
 +         */
 +        public StatisticsGatheringContext(int responseCont, UUID reqId, StatisticsObjectConfiguration cfg) {
 +            remainingResponses = responseCont;
 +            this.reqId = reqId;
 +            this.cfg = cfg;
 +        }
 +
 +        /**
 +         * Register response.
 +         *
 +         * @param data Object statistics from response.
 +         * @return {@code true} if all respones collected, {@code false} otherwise.
 +         */
 +        public synchronized boolean registerResponse(ObjectStatisticsImpl data) {
 +            responses.add(data);
 +            return --remainingResponses == 0;
 +        }
 +
 +        /**
 +         * @return Requests id.
 +         */
 +        public UUID reqId() {
 +            return reqId;
 +        }
 +
 +        /**
 +         * Get collected local object statistics.
 +         * @return Local object statistics.
 +         */
 +        public Collection<ObjectStatisticsImpl> collectedData() {
 +            assert remainingResponses == 0;
 +
 +            return responses;
 +        }
 +
 +        /**
 +         * @return Statistics configuration, used to start gathering.
 +         */
 +        public StatisticsObjectConfiguration configuration() {
 +            return cfg;
 +        }
 +    }
 +}
diff --cc modules/indexing/src/test/java/org/apache/ignite/internal/processors/query/stat/SqlStatisticsCommandTests.java
index 3d65b2f,5c681b8..f395c19
--- a/modules/indexing/src/test/java/org/apache/ignite/internal/processors/query/stat/SqlStatisticsCommandTests.java
+++ b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/query/stat/SqlStatisticsCommandTests.java
@@@ -28,9 -28,12 +28,11 @@@ import org.apache.ignite.internal.proce
  import org.apache.ignite.internal.util.typedef.G;
  import org.apache.ignite.internal.util.typedef.internal.U;
  import org.apache.ignite.testframework.GridTestUtils;
 -import org.apache.log4j.Level;
 -import org.apache.log4j.Logger;
 +import org.junit.Ignore;
  import org.junit.Test;
  
+ import static org.apache.ignite.testframework.GridTestUtils.waitForCondition;
+ 
  /**
   * Integration tests for statistics collection.
   */
diff --cc modules/indexing/src/test/java/org/apache/ignite/internal/processors/query/stat/StatisticsGlobalViewTest.java
index 7cf0c07,0000000..adfa0de
mode 100644,000000..100644
--- a/modules/indexing/src/test/java/org/apache/ignite/internal/processors/query/stat/StatisticsGlobalViewTest.java
+++ b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/query/stat/StatisticsGlobalViewTest.java
@@@ -1,160 -1,0 +1,160 @@@
 +/*
 + * Licensed to the Apache Software Foundation (ASF) under one or more
 + * contributor license agreements.  See the NOTICE file distributed with
 + * this work for additional information regarding copyright ownership.
 + * The ASF licenses this file to You under the Apache License, Version 2.0
 + * (the "License"); you may not use this file except in compliance with
 + * the License.  You may obtain a copy of the License at
 + *
 + *      http://www.apache.org/licenses/LICENSE-2.0
 + *
 + * Unless required by applicable law or agreed to in writing, software
 + * distributed under the License is distributed on an "AS IS" BASIS,
 + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 + * See the License for the specific language governing permissions and
 + * limitations under the License.
 + */
 +
 +package org.apache.ignite.internal.processors.query.stat;
 +
 +import java.util.Arrays;
 +import java.util.List;
 +import org.apache.ignite.Ignite;
 +import org.apache.ignite.cluster.ClusterState;
 +import org.apache.ignite.internal.IgniteEx;
 +import org.apache.ignite.internal.util.typedef.G;
 +import org.apache.ignite.testframework.GridTestUtils;
 +import org.junit.Test;
 +
 +/**
 + * Tests for global statistics view.
 + */
 +public abstract class StatisticsGlobalViewTest extends StatisticsAbstractTest {
 +    /** {@inheritDoc} */
 +    @Override protected void beforeTest() throws Exception {
 +        super.beforeTestsStarted();
 +        cleanPersistenceDir();
 +
 +        startGridsMultiThreaded(2);
 +        grid(0).cluster().state(ClusterState.ACTIVE);
 +
 +        grid(0).getOrCreateCache(DEFAULT_CACHE_NAME);
 +
 +        createSmallTable(null);
 +        collectStatistics(StatisticsType.GLOBAL, SMALL_TARGET);
 +    }
 +
 +    /** {@inheritDoc} */
 +    @Override protected void afterTest() throws Exception {
 +        stopAllGrids();
 +
 +        super.afterTest();
 +    }
 +
 +    /**
 +     * Start additional node and try to collect statistics without adding it into baseline topology.
 +     * Check that statistics awailable, but no local statistics exists and no additional gathering tasks
 +     * stick in such node.
 +     *
 +     * @throws Exception In case of errors.
 +     */
 +    @Test
 +    public void testStatisticsCollectionOutsideBaseline() throws Exception {
 +        List<List<Object>> partLines = Arrays.asList(
 +            Arrays.asList(SCHEMA, "TABLE", "SMALL", "A", (long)SMALL_SIZE, (long)SMALL_SIZE, 0L, 100L, 4, null, null),
 +            Arrays.asList(SCHEMA, "TABLE", "SMALL", "B", (long)SMALL_SIZE, (long)SMALL_SIZE, 0L, 100L, 4, null, null),
 +            Arrays.asList(SCHEMA, "TABLE", "SMALL", "C", (long)SMALL_SIZE, 10L, 0L, 100L, 4, null, null)
 +        );
 +
 +        checkSqlResult("select * from SYS.STATISTICS_GLOBAL_DATA where NAME = 'SMALL'", null, act -> {
 +            checkContains(partLines, act);
 +
 +            return true;
 +        });
 +
 +        ignite(0).cluster().baselineAutoAdjustEnabled(false);
 +
 +        IgniteEx ign2 = startGrid(2);
 +        awaitPartitionMapExchange();
 +
 +        requestGlobalStatistics(SMALL_KEY);
 +
 +        assertTrue(GridTestUtils.waitForCondition(() -> statisticsMgr(ign2).getGlobalStatistics(SMALL_KEY) != null,
 +            TIMEOUT));
 +
 +        checkSqlResult("select * from SYS.STATISTICS_GLOBAL_DATA where NAME = 'SMALL'", null, act -> {
 +            checkContains(partLines, act);
 +
 +            return true;
 +        });
 +
 +        stopGrid(2);
 +    }
 +
 +    /**
 +     * Check that global stats equals on each node in cluster:
 +     * 1) Check global statistics on each grid nodes.
 +     * 2) Start new node and check global statistics.
 +     * 3) Collect statistics configuration and check it on each node.
 +     */
 +    @Test
 +    public void testGlobalStatEquals() throws Exception {
 +        List<List<Object>> partLines = Arrays.asList(
 +            Arrays.asList(SCHEMA, "TABLE", "SMALL", "A", (long)SMALL_SIZE, (long)SMALL_SIZE, 0L, 100L, 4, null, null),
 +            Arrays.asList(SCHEMA, "TABLE", "SMALL", "B", (long)SMALL_SIZE, (long)SMALL_SIZE, 0L, 100L, 4, null, null),
 +            Arrays.asList(SCHEMA, "TABLE", "SMALL", "C", (long)SMALL_SIZE, 10L, 0L, 100L, 4, null, null)
 +        );
 +
 +        checkSqlResult("select * from SYS.STATISTICS_GLOBAL_DATA where NAME = 'SMALL'", null, act -> {
 +            checkContains(partLines, act);
 +
 +            return true;
 +        });
 +
 +        startGrid(2);
 +        awaitPartitionMapExchange();
 +
 +        requestGlobalStatistics(SMALL_KEY);
 +
 +        long minVer = minStatVer(statisticsMgr(0), SMALL_TARGET);
 +
 +        checkSqlResult("select * from SYS.STATISTICS_GLOBAL_DATA where NAME = 'SMALL' and VERSION >= " + minVer,
 +            null, list -> !list.isEmpty());
 +
 +        ignite(0).cluster().baselineAutoAdjustEnabled(false);
 +        ignite(0).cluster().setBaselineTopology(ignite(1).cluster().topologyVersion());
 +        awaitPartitionMapExchange(true, true, null);
 +
 +        for (Ignite ign : G.allGrids()) {
 +
 +            System.out.println("node = " + ign.cluster().localNode().id() +
 +                " is Server = " + !((IgniteEx)ign).localNode().isClient() + " local stat = " +
 +                statisticsMgr((IgniteEx)ign).getLocalStatistics(SMALL_KEY));
 +        }
 +
 +        collectStatistics(StatisticsType.GLOBAL, SMALL_TARGET);
 +
 +        minVer++;
 +
 +        checkSqlResult("select * from SYS.STATISTICS_GLOBAL_DATA where NAME = 'SMALL' and VERSION >= " + minVer,
 +            null, act -> {
-             checkContains(partLines, act);
-             return true;
-         });
++                checkContains(partLines, act);
++                return true;
++            });
 +
 +        stopGrid(2);
 +    }
 +
 +    /**
 +     * Request global statistics by specified key from each node.
 +     *
 +     * @param key Key to request global statistics by.
 +     */
 +    private void requestGlobalStatistics(StatisticsKey key) {
 +        for (Ignite ign : G.allGrids()) {
 +            IgniteStatisticsManagerImpl nodeMgr = statisticsMgr((IgniteEx)ign);
 +
 +            nodeMgr.getGlobalStatistics(key);
 +        }
 +    }
 +}