You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by iv...@apache.org on 2021/11/16 11:06:36 UTC
[ignite] branch master updated: IGNITE-15794 Implement request attributes support for java services - Fixes #9510.
This is an automated email from the ASF dual-hosted git repository.
ivandasch pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/ignite.git
The following commit(s) were added to refs/heads/master by this push:
new 95c8c6b IGNITE-15794 Implement request attributes support for java services - Fixes #9510.
95c8c6b is described below
commit 95c8c6bdb2bcd1baf9be8f9d1ac22ed777bc8120
Author: Pavel Pereslegin <xx...@gmail.com>
AuthorDate: Tue Nov 16 14:00:22 2021 +0300
IGNITE-15794 Implement request attributes support for java services - Fixes #9510.
Signed-off-by: Ivan Daschinsky <iv...@apache.org>
---
.../java/org/apache/ignite/IgniteServices.java | 49 +++++
.../apache/ignite/internal/IgniteServicesEx.java | 54 +++++
.../apache/ignite/internal/IgniteServicesImpl.java | 46 +++-
.../client/service/ClientServiceInvokeRequest.java | 4 +-
.../platform/services/PlatformServices.java | 4 +-
.../resource/GridResourceServiceInjector.java | 55 +++--
.../processors/service/GridServiceProcessor.java | 15 +-
.../processors/service/GridServiceProxy.java | 93 +++++++-
.../processors/service/IgniteServiceProcessor.java | 17 +-
.../service/ServiceCallContextHolder.java | 46 ++++
.../processors/service/ServiceCallContextImpl.java | 77 +++++++
.../processors/service/ServiceContextImpl.java | 6 +
.../service/ServiceProcessorAdapter.java | 14 +-
.../apache/ignite/resources/ServiceResource.java | 7 +
.../apache/ignite/services/ServiceCallContext.java | 86 ++++++++
.../ignite/services/ServiceCallContextBuilder.java | 77 +++++++
.../org/apache/ignite/services/ServiceContext.java | 8 +
.../service/IgniteServiceCallContextTest.java | 238 +++++++++++++++++++++
.../testsuites/IgniteServiceGridTestSuite.java | 2 +
19 files changed, 840 insertions(+), 58 deletions(-)
diff --git a/modules/core/src/main/java/org/apache/ignite/IgniteServices.java b/modules/core/src/main/java/org/apache/ignite/IgniteServices.java
index 11b2fd9..769d322 100644
--- a/modules/core/src/main/java/org/apache/ignite/IgniteServices.java
+++ b/modules/core/src/main/java/org/apache/ignite/IgniteServices.java
@@ -22,9 +22,11 @@ import org.apache.ignite.cluster.ClusterGroup;
import org.apache.ignite.configuration.IgniteConfiguration;
import org.apache.ignite.lang.IgniteAsyncSupport;
import org.apache.ignite.lang.IgniteAsyncSupported;
+import org.apache.ignite.lang.IgniteExperimental;
import org.apache.ignite.lang.IgniteFuture;
import org.apache.ignite.resources.IgniteInstanceResource;
import org.apache.ignite.services.Service;
+import org.apache.ignite.services.ServiceCallContext;
import org.apache.ignite.services.ServiceConfiguration;
import org.apache.ignite.services.ServiceDeploymentException;
import org.apache.ignite.services.ServiceDescriptor;
@@ -610,6 +612,53 @@ public interface IgniteServices extends IgniteAsyncSupport {
public <T> T serviceProxy(String name, Class<? super T> svcItf, boolean sticky, long timeout)
throws IgniteException;
+ /**
+ * Gets a remote handle on the service with the specified caller context. The proxy
+ * is dynamically created and provided for the specified service.
+ *
+ * @param name Service name.
+ * @param svcItf Interface for the service.
+ * @param sticky Whether or not Ignite should always contact the same remote
+ * service or try to load-balance between services.
+ * @param callCtx Service call context.
+ * @param <T> Service type.
+ * @return Proxy over service.
+ * @throws IgniteException If failed to create service proxy.
+ * @see ServiceCallContext
+ */
+ @IgniteExperimental
+ public <T> T serviceProxy(
+ String name,
+ Class<? super T> svcItf,
+ boolean sticky,
+ ServiceCallContext callCtx
+ ) throws IgniteException;
+
+ /**
+ * Gets a remote handle on the service with the specified caller context and timeout.
+ * The proxy is dynamically created and provided for the specified service.
+ *
+ * @param name Service name.
+ * @param svcItf Interface for the service.
+ * @param sticky Whether or not Ignite should always contact the same remote
+ * service or try to load-balance between services.
+ * @param callCtx Service call context.
+ * @param timeout If greater than 0 created proxy will wait for service availability only specified time,
+ * and will limit remote service invocation time.
+ * @param <T> Service type.
+ * @return Proxy over service.
+ * @throws IgniteException If failed to create service proxy.
+ * @see ServiceCallContext
+ */
+ @IgniteExperimental
+ public <T> T serviceProxy(
+ String name,
+ Class<? super T> svcItf,
+ boolean sticky,
+ ServiceCallContext callCtx,
+ long timeout
+ ) throws IgniteException;
+
/** {@inheritDoc} */
@Deprecated
@Override public IgniteServices withAsync();
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/IgniteServicesEx.java b/modules/core/src/main/java/org/apache/ignite/internal/IgniteServicesEx.java
new file mode 100644
index 0000000..0966a18
--- /dev/null
+++ b/modules/core/src/main/java/org/apache/ignite/internal/IgniteServicesEx.java
@@ -0,0 +1,54 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal;
+
+import java.util.function.Supplier;
+import org.apache.ignite.IgniteException;
+import org.apache.ignite.IgniteServices;
+import org.apache.ignite.services.ServiceCallContext;
+import org.jetbrains.annotations.Nullable;
+
+/**
+ * Extended interface that provides additional internal methods for managing services.
+ */
+public interface IgniteServicesEx extends IgniteServices {
+ /**
+ * Gets a remote handle on the service. If service is available locally and no caller context provider is
+ * specified, then a local instance is returned and the timeout is ignored, otherwise, a proxy is dynamically
+ * created and provided for the specified service.
+ *
+ * @param name Service name.
+ * @param svcItf Interface for the service.
+ * @param sticky Whether or not Ignite should always contact the same remote
+ * service or try to load-balance between services.
+ * @param callCtxProvider Caller context provider.
+ * @param timeout If greater than 0 created proxy will wait for service availability only specified time,
+ * and will limit remote service invocation time.
+ * @param <T> Service type.
+ * @return Either proxy over remote service or local service if it is deployed locally and no caller context
+ * provider is specified.
+ * @throws IgniteException If failed to create service proxy.
+ */
+ public <T> T serviceProxy(
+ String name,
+ Class<? super T> svcItf,
+ boolean sticky,
+ @Nullable Supplier<ServiceCallContext> callCtxProvider,
+ long timeout
+ ) throws IgniteException;
+}
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/IgniteServicesImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/IgniteServicesImpl.java
index 59d6136..a01e546 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/IgniteServicesImpl.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/IgniteServicesImpl.java
@@ -24,6 +24,7 @@ import java.io.ObjectOutput;
import java.io.ObjectStreamException;
import java.util.Collection;
import java.util.Collections;
+import java.util.function.Supplier;
import org.apache.ignite.IgniteCheckedException;
import org.apache.ignite.IgniteException;
import org.apache.ignite.IgniteServices;
@@ -34,6 +35,7 @@ import org.apache.ignite.internal.util.typedef.internal.A;
import org.apache.ignite.internal.util.typedef.internal.U;
import org.apache.ignite.lang.IgniteFuture;
import org.apache.ignite.services.Service;
+import org.apache.ignite.services.ServiceCallContext;
import org.apache.ignite.services.ServiceConfiguration;
import org.apache.ignite.services.ServiceDescriptor;
import org.jetbrains.annotations.Nullable;
@@ -41,7 +43,7 @@ import org.jetbrains.annotations.Nullable;
/**
* {@link org.apache.ignite.IgniteServices} implementation.
*/
-public class IgniteServicesImpl extends AsyncSupportAdapter implements IgniteServices, Externalizable {
+public class IgniteServicesImpl extends AsyncSupportAdapter implements IgniteServicesEx, Externalizable {
/** */
private static final long serialVersionUID = 0L;
@@ -374,8 +376,44 @@ public class IgniteServicesImpl extends AsyncSupportAdapter implements IgniteSer
}
/** {@inheritDoc} */
- @Override public <T> T serviceProxy(final String name, final Class<? super T> svcItf, final boolean sticky,
- final long timeout) throws IgniteException {
+ @Override public <T> T serviceProxy(
+ final String name,
+ final Class<? super T> svcItf,
+ final boolean sticky,
+ final long timeout
+ ) throws IgniteException {
+ return (T)serviceProxy(name, svcItf, sticky, (Supplier<ServiceCallContext>)null, timeout);
+ }
+
+ /** {@inheritDoc} */
+ @Override public <T> T serviceProxy(
+ final String name,
+ final Class<? super T> svcItf,
+ final boolean sticky,
+ @Nullable ServiceCallContext callCtx
+ ) throws IgniteException {
+ return (T)serviceProxy(name, svcItf, sticky, callCtx, 0);
+ }
+
+ /** {@inheritDoc} */
+ @Override public <T> T serviceProxy(
+ final String name,
+ final Class<? super T> svcItf,
+ final boolean sticky,
+ @Nullable ServiceCallContext callCtx,
+ final long timeout
+ ) throws IgniteException {
+ return (T)serviceProxy(name, svcItf, sticky, callCtx != null ? () -> callCtx : null, timeout);
+ }
+
+ /** {@inheritDoc} */
+ @Override public <T> T serviceProxy(
+ final String name,
+ final Class<? super T> svcItf,
+ final boolean sticky,
+ @Nullable Supplier<ServiceCallContext> callCtxProvider,
+ final long timeout
+ ) throws IgniteException {
A.notNull(name, "name");
A.notNull(svcItf, "svcItf");
A.ensure(svcItf.isInterface(), "Service class must be an interface: " + svcItf);
@@ -384,7 +422,7 @@ public class IgniteServicesImpl extends AsyncSupportAdapter implements IgniteSer
guard();
try {
- return (T)ctx.service().serviceProxy(prj, name, svcItf, sticky, timeout);
+ return (T)ctx.service().serviceProxy(prj, name, svcItf, sticky, callCtxProvider, timeout);
}
finally {
unguard();
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/client/service/ClientServiceInvokeRequest.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/client/service/ClientServiceInvokeRequest.java
index 7d4c0dc..acf0b80 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/client/service/ClientServiceInvokeRequest.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/client/service/ClientServiceInvokeRequest.java
@@ -168,13 +168,13 @@ public class ClientServiceInvokeRequest extends ClientRequest {
}
GridServiceProxy<?> proxy = new GridServiceProxy<>(grp, name, Service.class, false, timeout,
- ctx.kernalContext());
+ ctx.kernalContext(), null);
Method method = resolveMethod(ctx, svcCls);
PlatformServices.convertArrayArgs(args, method);
- res = proxy.invokeMethod(method, args);
+ res = proxy.invokeMethod(method, args, null);
}
return new ClientObjectResponse(requestId(), res);
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/services/PlatformServices.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/services/PlatformServices.java
index 421b35e..8e62494 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/services/PlatformServices.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/services/PlatformServices.java
@@ -383,7 +383,7 @@ public class PlatformServices extends PlatformAbstractTarget {
Object proxy = PlatformService.class.isAssignableFrom(d.serviceClass())
? services.serviceProxy(name, PlatformService.class, sticky)
: new GridServiceProxy<>(services.clusterGroup(), name, Service.class, sticky, 0,
- platformCtx.kernalContext());
+ platformCtx.kernalContext(), null);
return new ServiceProxyHolder(proxy, d.serviceClass(), platformContext());
}
@@ -622,7 +622,7 @@ public class PlatformServices extends PlatformAbstractTarget {
Method mtd = getMethod(serviceClass, mthdName, args);
convertArrayArgs(args, mtd);
- return ((GridServiceProxy)proxy).invokeMethod(mtd, args);
+ return ((GridServiceProxy)proxy).invokeMethod(mtd, args, null);
}
}
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/resource/GridResourceServiceInjector.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/resource/GridResourceServiceInjector.java
index 366b0a8..a4b6cde 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/resource/GridResourceServiceInjector.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/resource/GridResourceServiceInjector.java
@@ -18,23 +18,26 @@
package org.apache.ignite.internal.processors.resource;
import java.util.Collection;
-import org.apache.ignite.Ignite;
import org.apache.ignite.IgniteCheckedException;
+import org.apache.ignite.internal.IgniteEx;
+import org.apache.ignite.internal.IgniteServicesEx;
import org.apache.ignite.internal.managers.deployment.GridDeployment;
+import org.apache.ignite.internal.processors.service.ServiceCallContextHolder;
import org.apache.ignite.resources.ServiceResource;
import org.apache.ignite.services.Service;
+import org.jetbrains.annotations.Nullable;
/**
* Grid service injector.
*/
public class GridResourceServiceInjector extends GridResourceBasicInjector<Collection<Service>> {
/** */
- private Ignite ignite;
+ private IgniteEx ignite;
/**
* @param ignite Grid.
*/
- public GridResourceServiceInjector(Ignite ignite) {
+ public GridResourceServiceInjector(IgniteEx ignite) {
super(null);
this.ignite = ignite;
@@ -43,16 +46,7 @@ public class GridResourceServiceInjector extends GridResourceBasicInjector<Colle
/** {@inheritDoc} */
@Override public void inject(GridResourceField field, Object target, Class<?> depCls, GridDeployment dep)
throws IgniteCheckedException {
- ServiceResource ann = (ServiceResource)field.getAnnotation();
-
- Class svcItf = ann.proxyInterface();
-
- Object svc;
-
- if (svcItf == Void.class)
- svc = ignite.services().service(ann.serviceName());
- else
- svc = ignite.services().serviceProxy(ann.serviceName(), svcItf, ann.proxySticky());
+ Object svc = getService((ServiceResource)field.getAnnotation());
if (svc != null)
GridResourceUtils.inject(field.getField(), target, svc);
@@ -61,24 +55,27 @@ public class GridResourceServiceInjector extends GridResourceBasicInjector<Colle
/** {@inheritDoc} */
@Override public void inject(GridResourceMethod mtd, Object target, Class<?> depCls, GridDeployment dep)
throws IgniteCheckedException {
- ServiceResource ann = (ServiceResource)mtd.getAnnotation();
-
- Class svcItf = ann.proxyInterface();
-
- Object svc;
-
- if (svcItf == Void.class)
- svc = ignite.services().service(ann.serviceName());
- else
- svc = ignite.services().serviceProxy(ann.serviceName(), svcItf, ann.proxySticky());
-
- Class<?>[] types = mtd.getMethod().getParameterTypes();
-
- if (types.length != 1)
- throw new IgniteCheckedException("Setter does not have single parameter of required type [type=" +
- svc.getClass().getName() + ", setter=" + mtd + ']');
+ Object svc = getService((ServiceResource)mtd.getAnnotation());
if (svc != null)
GridResourceUtils.inject(mtd.getMethod(), target, svc);
}
+
+ /**
+ * @param ann Service resource annotation.
+ * @return Proxy for the service if a proxy interface was specified, otherwise the service itself or {@code null}
+ * if the service is not deployed locally.
+ */
+ private @Nullable <T> T getService(ServiceResource ann) {
+ if (ann.proxyInterface() == Void.class)
+ return ignite.services().service(ann.serviceName());
+
+ return ((IgniteServicesEx)ignite.services()).serviceProxy(
+ ann.serviceName(),
+ (Class<? super T>)ann.proxyInterface(),
+ ann.proxySticky(),
+ ann.forwardCallerContext() ? ServiceCallContextHolder::current : null,
+ 0
+ );
+ }
}
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/service/GridServiceProcessor.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/service/GridServiceProcessor.java
index 13b1f02..0debe6a 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/service/GridServiceProcessor.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/service/GridServiceProcessor.java
@@ -37,6 +37,7 @@ import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.TimeUnit;
+import java.util.function.Supplier;
import javax.cache.Cache;
import javax.cache.event.CacheEntryEvent;
import javax.cache.event.CacheEntryUpdatedListener;
@@ -102,6 +103,7 @@ import org.apache.ignite.resources.IgniteInstanceResource;
import org.apache.ignite.resources.JobContextResource;
import org.apache.ignite.resources.LoggerResource;
import org.apache.ignite.services.Service;
+import org.apache.ignite.services.ServiceCallContext;
import org.apache.ignite.services.ServiceConfiguration;
import org.apache.ignite.services.ServiceContext;
import org.apache.ignite.services.ServiceDeploymentException;
@@ -1021,9 +1023,14 @@ public class GridServiceProcessor extends ServiceProcessorAdapter implements Ign
}
/** {@inheritDoc} */
- @Override public <T> T serviceProxy(ClusterGroup prj, String name, Class<? super T> srvcCls, boolean sticky,
- long timeout)
- throws IgniteException {
+ @Override public <T> T serviceProxy(
+ ClusterGroup prj,
+ String name,
+ Class<? super T> srvcCls,
+ boolean sticky,
+ @Nullable Supplier<ServiceCallContext> callCtxProvider,
+ long timeout
+ ) throws IgniteException {
ctx.security().authorize(name, SecurityPermission.SERVICE_INVOKE);
if (hasLocalNode(prj)) {
@@ -1043,7 +1050,7 @@ public class GridServiceProcessor extends ServiceProcessorAdapter implements Ign
}
}
- return new GridServiceProxy<T>(prj, name, srvcCls, sticky, timeout, ctx).proxy();
+ return new GridServiceProxy<T>(prj, name, srvcCls, sticky, timeout, ctx, callCtxProvider).proxy();
}
/**
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/service/GridServiceProxy.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/service/GridServiceProxy.java
index cc65f34..e101b6c 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/service/GridServiceProxy.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/service/GridServiceProxy.java
@@ -34,6 +34,7 @@ import java.util.Map;
import java.util.UUID;
import java.util.concurrent.ThreadLocalRandom;
import java.util.concurrent.atomic.AtomicReference;
+import java.util.function.Supplier;
import org.apache.ignite.IgniteCheckedException;
import org.apache.ignite.IgniteException;
import org.apache.ignite.IgniteLogger;
@@ -55,6 +56,8 @@ import org.apache.ignite.lang.IgniteCallable;
import org.apache.ignite.platform.PlatformServiceMethod;
import org.apache.ignite.resources.IgniteInstanceResource;
import org.apache.ignite.services.Service;
+import org.apache.ignite.services.ServiceCallContext;
+import org.jetbrains.annotations.Nullable;
import static org.apache.ignite.internal.processors.task.GridTaskThreadContextKey.TC_IO_POLICY;
@@ -114,14 +117,16 @@ public class GridServiceProxy<T> implements Serializable {
* @param sticky Whether multi-node request should be done.
* @param timeout Service availability wait timeout. Cannot be negative.
* @param ctx Context.
+ * @param callCtxProvider Caller context provider.
*/
public GridServiceProxy(ClusterGroup prj,
String name,
Class<? super T> svc,
boolean sticky,
long timeout,
- GridKernalContext ctx)
- {
+ GridKernalContext ctx,
+ @Nullable Supplier<ServiceCallContext> callCtxProvider
+ ) {
assert timeout >= 0 : timeout;
this.prj = prj;
@@ -137,7 +142,7 @@ public class GridServiceProxy<T> implements Serializable {
proxy = (T)Proxy.newProxyInstance(
svc.getClassLoader(),
new Class[] {svc},
- new ProxyInvocationHandler()
+ new ProxyInvocationHandler(callCtxProvider)
);
}
@@ -159,10 +164,15 @@ public class GridServiceProxy<T> implements Serializable {
*
* @param mtd Method.
* @param args Arugments.
+ * @param callCtx Service call context.
* @return Result.
*/
@SuppressWarnings("BusyWait")
- public Object invokeMethod(final Method mtd, final Object[] args) throws Throwable {
+ public Object invokeMethod(
+ final Method mtd,
+ final Object[] args,
+ @Nullable ServiceCallContext callCtx
+ ) throws Throwable {
if (U.isHashCodeMethod(mtd))
return System.identityHashCode(proxy);
else if (U.isEqualsMethod(mtd))
@@ -192,7 +202,7 @@ public class GridServiceProxy<T> implements Serializable {
Service svc = svcCtx.service();
if (svc != null)
- return callServiceLocally(svc, mtd, args);
+ return callServiceLocally(svc, mtd, args, callCtx);
}
}
else {
@@ -201,7 +211,7 @@ public class GridServiceProxy<T> implements Serializable {
// Execute service remotely.
return ctx.closure().callAsyncNoFailover(
GridClosureCallMode.BROADCAST,
- new ServiceProxyCallable(methodName(mtd), name, mtd.getParameterTypes(), args),
+ new ServiceProxyCallable(methodName(mtd), name, mtd.getParameterTypes(), args, callCtx),
Collections.singleton(node),
false,
waitTimeout,
@@ -268,13 +278,44 @@ public class GridServiceProxy<T> implements Serializable {
* @param svc Service to be called.
* @param mtd Method to call.
* @param args Method args.
+ * @param callCtx Service call context.
* @return Invocation result.
*/
- private Object callServiceLocally(Service svc, Method mtd, Object[] args) throws Exception {
+ private Object callServiceLocally(
+ Service svc,
+ Method mtd,
+ Object[] args,
+ @Nullable ServiceCallContext callCtx
+ ) throws Exception {
if (svc instanceof PlatformService && !PLATFORM_SERVICE_INVOKE_METHOD.equals(mtd))
return ((PlatformService)svc).invokeMethod(methodName(mtd), false, true, args);
else
+ return callServiceMethod(svc, mtd, args, callCtx);
+ }
+
+ /**
+ * @param svc Service to be called.
+ * @param mtd Method to call.
+ * @param args Method args.
+ * @param callCtx Service call context.
+ * @return Invocation result.
+ */
+ private static Object callServiceMethod(
+ Service svc,
+ Method mtd,
+ Object[] args,
+ @Nullable ServiceCallContext callCtx
+ ) throws InvocationTargetException, IllegalAccessException {
+ if (callCtx != null)
+ ServiceCallContextHolder.current(callCtx);
+
+ try {
return mtd.invoke(svc, args);
+ }
+ finally {
+ if (callCtx != null)
+ ServiceCallContextHolder.current(null);
+ }
}
/**
@@ -400,10 +441,19 @@ public class GridServiceProxy<T> implements Serializable {
* Invocation handler for service proxy.
*/
private class ProxyInvocationHandler implements InvocationHandler {
+ /** Caller context provider. */
+ private final Supplier<ServiceCallContext> callCtxProvider;
+
+ /**
+ * @param callCtxProvider Caller context provider.
+ */
+ public ProxyInvocationHandler(@Nullable Supplier<ServiceCallContext> callCtxProvider) {
+ this.callCtxProvider = callCtxProvider;
+ }
/** {@inheritDoc} */
@Override public Object invoke(Object proxy, final Method mtd, final Object[] args) throws Throwable {
- return invokeMethod(mtd, args);
+ return invokeMethod(mtd, args, callCtxProvider != null ? callCtxProvider.get() : null);
}
}
@@ -426,6 +476,9 @@ public class GridServiceProxy<T> implements Serializable {
/** Args. */
private Object[] args;
+ /** Service call context. */
+ private ServiceCallContext callCtx;
+
/** Grid instance. */
@IgniteInstanceResource
private transient IgniteEx ignite;
@@ -442,12 +495,20 @@ public class GridServiceProxy<T> implements Serializable {
* @param svcName Service name.
* @param argTypes Argument types.
* @param args Arguments for invocation.
+ * @param callCtx Service call context.
*/
- private ServiceProxyCallable(String mtdName, String svcName, Class<?>[] argTypes, Object[] args) {
+ private ServiceProxyCallable(
+ String mtdName,
+ String svcName,
+ Class<?>[] argTypes,
+ Object[] args,
+ @Nullable ServiceCallContext callCtx
+ ) {
this.mtdName = mtdName;
this.svcName = svcName;
this.argTypes = argTypes;
this.args = args;
+ this.callCtx = callCtx;
}
/** {@inheritDoc} */
@@ -486,7 +547,7 @@ public class GridServiceProxy<T> implements Serializable {
throw new GridServiceMethodNotFoundException(svcName, mtdName, argTypes);
try {
- return mtd.invoke(srv, args);
+ return callServiceMethod(srv, mtd, args, callCtx);
}
catch (InvocationTargetException e) {
throw new ServiceProxyException(e.getCause());
@@ -499,6 +560,13 @@ public class GridServiceProxy<T> implements Serializable {
U.writeString(out, mtdName);
U.writeArray(out, argTypes);
U.writeArray(out, args);
+
+ if (callCtx != null) {
+ out.writeBoolean(true);
+ callCtx.writeExternal(out);
+ }
+ else
+ out.writeBoolean(false);
}
/** {@inheritDoc} */
@@ -507,6 +575,11 @@ public class GridServiceProxy<T> implements Serializable {
mtdName = U.readString(in);
argTypes = U.readClassArray(in);
args = U.readArray(in);
+
+ if (in.readBoolean()) {
+ callCtx = new ServiceCallContextImpl();
+ callCtx.readExternal(in);
+ }
}
/** {@inheritDoc} */
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/service/IgniteServiceProcessor.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/service/IgniteServiceProcessor.java
index c353460..8692002 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/service/IgniteServiceProcessor.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/service/IgniteServiceProcessor.java
@@ -39,6 +39,7 @@ import java.util.concurrent.Executors;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.ReentrantReadWriteLock;
+import java.util.function.Supplier;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import org.apache.ignite.IgniteCheckedException;
@@ -78,6 +79,7 @@ import org.apache.ignite.marshaller.Marshaller;
import org.apache.ignite.marshaller.jdk.JdkMarshaller;
import org.apache.ignite.plugin.security.SecurityPermission;
import org.apache.ignite.services.Service;
+import org.apache.ignite.services.ServiceCallContext;
import org.apache.ignite.services.ServiceConfiguration;
import org.apache.ignite.services.ServiceContext;
import org.apache.ignite.services.ServiceDeploymentException;
@@ -944,9 +946,14 @@ public class IgniteServiceProcessor extends ServiceProcessorAdapter implements I
}
/** {@inheritDoc} */
- @Override public <T> T serviceProxy(ClusterGroup prj, String name, Class<? super T> srvcCls, boolean sticky,
- long timeout)
- throws IgniteException {
+ @Override public <T> T serviceProxy(
+ ClusterGroup prj,
+ String name,
+ Class<? super T> srvcCls,
+ boolean sticky,
+ @Nullable Supplier<ServiceCallContext> callCtxProvider,
+ long timeout
+ ) throws IgniteException {
ctx.security().authorize(name, SecurityPermission.SERVICE_INVOKE);
if (hasLocalNode(prj)) {
@@ -955,7 +962,7 @@ public class IgniteServiceProcessor extends ServiceProcessorAdapter implements I
if (ctx != null) {
Service srvc = ctx.service();
- if (srvc != null) {
+ if (srvc != null && callCtxProvider == null) {
if (srvcCls.isAssignableFrom(srvc.getClass()))
return (T)srvc;
else if (!PlatformService.class.isAssignableFrom(srvc.getClass())) {
@@ -966,7 +973,7 @@ public class IgniteServiceProcessor extends ServiceProcessorAdapter implements I
}
}
- return new GridServiceProxy<T>(prj, name, srvcCls, sticky, timeout, ctx).proxy();
+ return new GridServiceProxy<T>(prj, name, srvcCls, sticky, timeout, ctx, callCtxProvider).proxy();
}
/**
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/service/ServiceCallContextHolder.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/service/ServiceCallContextHolder.java
new file mode 100644
index 0000000..a9a449c
--- /dev/null
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/service/ServiceCallContextHolder.java
@@ -0,0 +1,46 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.service;
+
+import org.apache.ignite.services.ServiceCallContext;
+import org.jetbrains.annotations.Nullable;
+
+/**
+ * Holder of the context of the service call.
+ */
+public class ServiceCallContextHolder {
+ /** Service call context of the current thread. */
+ private static final ThreadLocal<ServiceCallContext> locCallCtx = new ThreadLocal<>();
+
+ /**
+ * @return Service call context of the current thread.
+ */
+ @Nullable public static ServiceCallContext current() {
+ return locCallCtx.get();
+ }
+
+ /**
+ * @param callCtx Service call context of the current thread.
+ */
+ static void current(@Nullable ServiceCallContext callCtx) {
+ if (callCtx != null)
+ locCallCtx.set(callCtx);
+ else
+ locCallCtx.remove();
+ }
+}
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/service/ServiceCallContextImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/service/ServiceCallContextImpl.java
new file mode 100644
index 0000000..aceedca
--- /dev/null
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/service/ServiceCallContextImpl.java
@@ -0,0 +1,77 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.service;
+
+import java.io.IOException;
+import java.io.ObjectInput;
+import java.io.ObjectOutput;
+import java.nio.charset.StandardCharsets;
+import java.util.HashMap;
+import java.util.Map;
+import org.apache.ignite.internal.util.typedef.internal.U;
+import org.apache.ignite.services.ServiceCallContext;
+
+/**
+ * Service call context implementation.
+ */
+public class ServiceCallContextImpl implements ServiceCallContext {
+ /** */
+ private static final long serialVersionUID = 0L;
+
+ /** Service call context attributes. */
+ private Map<String, byte[]> attrs;
+
+ /**
+ * Default contructor.
+ */
+ public ServiceCallContextImpl() {
+ attrs = new HashMap<>();
+ }
+
+ /**
+ * @param attrs Service call context attributes.
+ */
+ public ServiceCallContextImpl(Map<String, byte[]> attrs) {
+ this.attrs = new HashMap<>(attrs);
+ }
+
+ /** {@inheritDoc} */
+ @Override public String attribute(String name) {
+ byte[] bytes = attrs.get(name);
+
+ if (bytes == null)
+ return null;
+
+ return new String(bytes, StandardCharsets.UTF_8);
+ }
+
+ /** {@inheritDoc} */
+ @Override public byte[] binaryAttribute(String name) {
+ return attrs.get(name);
+ }
+
+ /** {@inheritDoc} */
+ @Override public void writeExternal(ObjectOutput out) throws IOException {
+ U.writeMap(out, attrs);
+ }
+
+ /** {@inheritDoc} */
+ @Override public void readExternal(ObjectInput in) throws IOException, ClassNotFoundException {
+ attrs = U.readMap(in);
+ }
+}
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/service/ServiceContextImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/service/ServiceContextImpl.java
index f624334..38acfce 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/service/ServiceContextImpl.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/service/ServiceContextImpl.java
@@ -26,6 +26,7 @@ import org.apache.ignite.internal.util.tostring.GridToStringExclude;
import org.apache.ignite.internal.util.tostring.GridToStringInclude;
import org.apache.ignite.internal.util.typedef.internal.S;
import org.apache.ignite.services.Service;
+import org.apache.ignite.services.ServiceCallContext;
import org.apache.ignite.services.ServiceContext;
import org.jetbrains.annotations.Nullable;
@@ -154,6 +155,11 @@ public class ServiceContextImpl implements ServiceContext {
return mtd == NULL_METHOD ? null : mtd;
}
+ /** {@inheritDoc} */
+ @Override public ServiceCallContext currentCallContext() {
+ return ServiceCallContextHolder.current();
+ }
+
/**
* @param isCancelled Cancelled flag.
*/
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/service/ServiceProcessorAdapter.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/service/ServiceProcessorAdapter.java
index 2fea452..56628ac 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/service/ServiceProcessorAdapter.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/service/ServiceProcessorAdapter.java
@@ -20,6 +20,7 @@ package org.apache.ignite.internal.processors.service;
import java.util.Collection;
import java.util.Map;
import java.util.UUID;
+import java.util.function.Supplier;
import org.apache.ignite.IgniteCheckedException;
import org.apache.ignite.IgniteException;
import org.apache.ignite.cluster.ClusterGroup;
@@ -29,8 +30,10 @@ import org.apache.ignite.internal.IgniteInternalFuture;
import org.apache.ignite.internal.managers.discovery.DiscoCache;
import org.apache.ignite.internal.processors.GridProcessorAdapter;
import org.apache.ignite.services.Service;
+import org.apache.ignite.services.ServiceCallContext;
import org.apache.ignite.services.ServiceConfiguration;
import org.apache.ignite.services.ServiceDescriptor;
+import org.jetbrains.annotations.Nullable;
/**
* Adapter for different service processor implementations.
@@ -119,13 +122,20 @@ public abstract class ServiceProcessorAdapter extends GridProcessorAdapter {
* @param name Service name.
* @param srvcCls Service class.
* @param sticky Whether multi-node request should be done.
+ * @param callCtxProvider Caller context provider.
* @param timeout If greater than 0 limits service acquire time. Cannot be negative.
* @param <T> Service interface type.
* @return The proxy of a service by its name and class.
* @throws IgniteException If failed to create proxy.
*/
- public abstract <T> T serviceProxy(ClusterGroup prj, String name, Class<? super T> srvcCls, boolean sticky,
- long timeout) throws IgniteException;
+ public abstract <T> T serviceProxy(
+ ClusterGroup prj,
+ String name,
+ Class<? super T> srvcCls,
+ boolean sticky,
+ @Nullable Supplier<ServiceCallContext> callCtxProvider,
+ long timeout
+ ) throws IgniteException;
/**
* @param name Service name.
diff --git a/modules/core/src/main/java/org/apache/ignite/resources/ServiceResource.java b/modules/core/src/main/java/org/apache/ignite/resources/ServiceResource.java
index b76179d..b4ee8d3 100644
--- a/modules/core/src/main/java/org/apache/ignite/resources/ServiceResource.java
+++ b/modules/core/src/main/java/org/apache/ignite/resources/ServiceResource.java
@@ -88,4 +88,11 @@ public @interface ServiceResource {
* @return {@code True} if a sticky instance of a service proxy should be injected.
*/
public boolean proxySticky() default false;
+
+ /**
+ * Flag indicating that the service call context should be passed to the injected service.
+ *
+ * @return {@code True} if the service call context should be passed to the injected service.
+ */
+ public boolean forwardCallerContext() default false;
}
diff --git a/modules/core/src/main/java/org/apache/ignite/services/ServiceCallContext.java b/modules/core/src/main/java/org/apache/ignite/services/ServiceCallContext.java
new file mode 100644
index 0000000..3830edc
--- /dev/null
+++ b/modules/core/src/main/java/org/apache/ignite/services/ServiceCallContext.java
@@ -0,0 +1,86 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.services;
+
+import java.io.Externalizable;
+import org.apache.ignite.lang.IgniteExperimental;
+
+/**
+ * Service call context.
+ * <p>
+ * This context is implicitly passed to the service and can be retrieved inside the service using {@link
+ * ServiceContext#currentCallContext()}. It is accessible only from the local thread during the execution of a service
+ * method.
+ * <p>
+ * Use {@link ServiceCallContext#builder()} to create the context builder.
+ * <p>
+ * <b>Note</b>: passing the context to the service may lead to performance overhead, so it should only be used for
+ * "middleware" tasks.
+ * <p>
+ * Usage example:
+ * <pre name="code" class="java">
+ *
+ * // Service implementation.
+ * class HelloServiceImpl implements HelloService {
+ * @ServiceContextResource
+ * ServiceContext ctx;
+ *
+ * public String call(Strig msg) {
+ * return msg + ctx.currentCallContext().attribute("user");
+ * }
+ * ...
+ * }
+ * ...
+ *
+ * // Call this service with context.
+ * ServiceCallContext callCtx = ServiceCallContext.builder().put("user", "John").build();
+ * HelloService helloSvc = ignite.services().serviceProxy("hello-service", HelloService.class, false, callCtx, 0);
+ * // Print "Hello John".
+ * System.out.println( helloSvc.call("Hello ") );
+ * </pre>
+ *
+ * @see ServiceContext
+ * @see ServiceCallContextBuilder
+ */
+@IgniteExperimental
+public interface ServiceCallContext extends Externalizable {
+ /**
+ * Create a context builder.
+ *
+ * @return Context builder.
+ */
+ public static ServiceCallContextBuilder builder() {
+ return new ServiceCallContextBuilder();
+ }
+
+ /**
+ * Get the string attribute.
+ *
+ * @param name Attribute name.
+ * @return String attribute value.
+ */
+ public String attribute(String name);
+
+ /**
+ * Get the binary attribute.
+ *
+ * @param name Attribute name.
+ * @return Binary attribute value.
+ */
+ public byte[] binaryAttribute(String name);
+}
diff --git a/modules/core/src/main/java/org/apache/ignite/services/ServiceCallContextBuilder.java b/modules/core/src/main/java/org/apache/ignite/services/ServiceCallContextBuilder.java
new file mode 100644
index 0000000..bbb221a
--- /dev/null
+++ b/modules/core/src/main/java/org/apache/ignite/services/ServiceCallContextBuilder.java
@@ -0,0 +1,77 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.services;
+
+import java.nio.charset.StandardCharsets;
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.Map;
+import org.apache.ignite.internal.processors.service.ServiceCallContextImpl;
+import org.apache.ignite.internal.util.typedef.internal.A;
+import org.apache.ignite.lang.IgniteExperimental;
+
+/**
+ * Service call context builder.
+ */
+@IgniteExperimental
+public class ServiceCallContextBuilder {
+ /** Service call context attributes. */
+ private final Map<String, byte[]> attrs = new HashMap<>();
+
+ /**
+ * Put string attribute.
+ *
+ * @param name Attribute name.
+ * @param value Attribute value.
+ * @return This for chaining.
+ */
+ public ServiceCallContextBuilder put(String name, String value) {
+ A.notNullOrEmpty(name, "name");
+ A.notNull(value, "value");
+
+ attrs.put(name, value.getBytes(StandardCharsets.UTF_8));
+
+ return this;
+ }
+
+ /**
+ * Put binary attribute.
+ *
+ * @param name Attribute name.
+ * @param value Attribute value.
+ * @return This for chaining.
+ */
+ public ServiceCallContextBuilder put(String name, byte[] value) {
+ A.notNullOrEmpty(name, "name");
+ A.notNull(value, "value");
+
+ attrs.put(name, Arrays.copyOf(value, value.length));
+
+ return this;
+ }
+
+ /**
+ * @return Service call context.
+ */
+ public ServiceCallContext build() {
+ if (attrs.isEmpty())
+ throw new IllegalStateException("Cannot create an empty context.");
+
+ return new ServiceCallContextImpl(attrs);
+ }
+}
diff --git a/modules/core/src/main/java/org/apache/ignite/services/ServiceContext.java b/modules/core/src/main/java/org/apache/ignite/services/ServiceContext.java
index bebf77a..873acce 100644
--- a/modules/core/src/main/java/org/apache/ignite/services/ServiceContext.java
+++ b/modules/core/src/main/java/org/apache/ignite/services/ServiceContext.java
@@ -67,4 +67,12 @@ public interface ServiceContext extends Serializable {
* @return Affinity key, possibly {@code null}.
*/
@Nullable public <K> K affinityKey();
+
+ /**
+ * Gets context of the current service call.
+ *
+ * @return Context of the current service call, possibly {@code null}.
+ * @see ServiceCallContext
+ */
+ @Nullable public ServiceCallContext currentCallContext();
}
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/service/IgniteServiceCallContextTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/service/IgniteServiceCallContextTest.java
new file mode 100644
index 0000000..20fbcb6
--- /dev/null
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/service/IgniteServiceCallContextTest.java
@@ -0,0 +1,238 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.service;
+
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.concurrent.CyclicBarrier;
+import java.util.concurrent.TimeUnit;
+import org.apache.ignite.Ignite;
+import org.apache.ignite.configuration.IgniteConfiguration;
+import org.apache.ignite.internal.IgniteInternalFuture;
+import org.apache.ignite.internal.util.future.GridCompoundFuture;
+import org.apache.ignite.internal.util.typedef.G;
+import org.apache.ignite.internal.util.typedef.T2;
+import org.apache.ignite.resources.ServiceContextResource;
+import org.apache.ignite.resources.ServiceResource;
+import org.apache.ignite.services.Service;
+import org.apache.ignite.services.ServiceCallContext;
+import org.apache.ignite.services.ServiceConfiguration;
+import org.apache.ignite.services.ServiceContext;
+import org.apache.ignite.testframework.GridTestUtils;
+import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+
+/**
+ * Tests service caller context.
+ */
+@RunWith(Parameterized.class)
+public class IgniteServiceCallContextTest extends GridCommonAbstractTest {
+ /** String attribute name. */
+ private static final String STR_ATTR_NAME = "str.attr";
+
+ /** Binary attribute name. */
+ private static final String BIN_ATTR_NAME = "bin.attr";
+
+ /** Service name. */
+ private static final String SVC_NAME = "test-svc";
+
+ /** Injected service name. */
+ private static final String SVC_NAME_INJECTED = "test-svc-injected";
+
+ /** Nodes count. */
+ private static final int NODES_CNT = 3;
+
+ /** Max number of services per node. */
+ private static final int SVC_PER_NODE = 2;
+
+ /** Flag to deploy single service instance per cluster. */
+ @Parameterized.Parameter(0)
+ public boolean clusterSingleton;
+
+ /** Whether or not Ignite should always contact the same remote service instance. */
+ @Parameterized.Parameter(1)
+ public boolean sticky;
+
+ /** {@inheritDoc} */
+ @Override public void beforeTestsStarted() throws Exception {
+ startGrids(NODES_CNT - 1);
+ startClientGrid(NODES_CNT - 1);
+ }
+
+ /** */
+ @Parameterized.Parameters(name = "clusterSingleton={0}, sticky={1}")
+ public static Collection<?> parameters() {
+ return Arrays.asList(new Object[][] {
+ {false, false},
+ {false, true},
+ {true, true},
+ });
+ }
+
+ /** {@inheritDoc} */
+ @Override protected IgniteConfiguration getConfiguration(String igniteInstanceName) throws Exception {
+ return super.getConfiguration(igniteInstanceName).setServiceConfiguration(
+ serviceCfg(SVC_NAME_INJECTED, true),
+ serviceCfg(SVC_NAME, clusterSingleton)
+ );
+ }
+
+ /**
+ * @param name Service name.
+ * @param clusterSingleton Flag to deploy single service instance per cluster.
+ * @return Service configuration.
+ */
+ private ServiceConfiguration serviceCfg(String name, boolean clusterSingleton) {
+ return new ServiceConfiguration()
+ .setName(name)
+ .setTotalCount(clusterSingleton ? 1 : NODES_CNT * SVC_PER_NODE)
+ .setMaxPerNodeCount(SVC_PER_NODE)
+ .setService(new TestServiceImpl());
+ }
+
+ /**
+ * Check context attribute.
+ */
+ @Test
+ public void testContextAttribute() {
+ for (int i = 0; i < NODES_CNT; i++) {
+ String strVal = String.valueOf(i);
+ byte[] binVal = strVal.getBytes();
+
+ TestService proxy = createProxyWithContext(grid(i), strVal, binVal);
+
+ assertEquals(strVal, proxy.attribute(false));
+ assertEquals(strVal, proxy.attribute(true));
+
+ assertTrue(Arrays.equals(binVal, proxy.binaryAttribute(false)));
+ assertTrue(Arrays.equals(binVal, proxy.binaryAttribute(true)));
+ }
+ }
+
+ /**
+ * Check context attribute concurrently.
+ *
+ * @throws Exception If failed.
+ */
+ @Test
+ public void testContextAttributeMultithreaded() throws Exception {
+ Map<TestService, T2<String, byte[]>> proxies = new HashMap<>();
+
+ for (int i = 0; i < G.allGrids().size(); i++) {
+ String strVal1 = String.valueOf(i * 2);
+ String strVal2 = String.valueOf(i * 2 + 1);
+ byte[] binVal1 = strVal1.getBytes();
+ byte[] binVal2 = strVal2.getBytes();
+
+ proxies.put(createProxyWithContext(grid(i), strVal1, binVal1), new T2<>(strVal1, binVal1));
+ proxies.put(createProxyWithContext(grid(i), strVal2, binVal2), new T2<>(strVal2, binVal2));
+ }
+
+ int threadsPerProxy = 2;
+ CyclicBarrier barrier = new CyclicBarrier(proxies.size() * threadsPerProxy);
+ GridCompoundFuture<Long, Long> compFut = new GridCompoundFuture<>();
+
+ for (Map.Entry<TestService, T2<String, byte[]>> e : proxies.entrySet()) {
+ IgniteInternalFuture<Long> fut = GridTestUtils.runMultiThreadedAsync(() -> {
+ barrier.await(getTestTimeout(), TimeUnit.MILLISECONDS);
+
+ for (int i = 0; i < 100; i++) {
+ T2<String, byte[]> expVals = e.getValue();
+ TestService proxy = e.getKey();
+
+ assertEquals(expVals.get1(), proxy.attribute(false));
+ assertEquals(expVals.get1(), proxy.attribute(true));
+
+ assertTrue(Arrays.equals(expVals.get2(), proxy.binaryAttribute(false)));
+ assertTrue(Arrays.equals(expVals.get2(), proxy.binaryAttribute(true)));
+ }
+
+ return true;
+ }, threadsPerProxy, "worker");
+
+ compFut.add(fut);
+ }
+
+ compFut.markInitialized();
+
+ compFut.get(getTestTimeout());
+ }
+
+ /**
+ * @param node Ignite node.
+ * @param attrVal String attribute value.
+ * @param binVal Binary attribute value.
+ * @return Service proxy instance.
+ */
+ private TestService createProxyWithContext(Ignite node, String attrVal, byte[] binVal) {
+ ServiceCallContext callCtx = ServiceCallContext.builder()
+ .put(STR_ATTR_NAME, attrVal)
+ .put(BIN_ATTR_NAME, binVal)
+ .build();
+
+ return node.services().serviceProxy(SVC_NAME, TestService.class, sticky, callCtx);
+ }
+
+ /** */
+ private static interface TestService extends Service {
+ /**
+ * @param useInjectedSvc Get attribute from the injected service.
+ * @return Context attribute value.
+ */
+ public String attribute(boolean useInjectedSvc);
+
+ /**
+ * @param useInjectedSvc Get attribute from the injected service.
+ * @return Context attribute value.
+ */
+ public byte[] binaryAttribute(boolean useInjectedSvc);
+ }
+
+ /** */
+ private static class TestServiceImpl implements TestService {
+ /** Injected service. */
+ @ServiceResource(serviceName = SVC_NAME_INJECTED, proxyInterface = TestService.class, forwardCallerContext = true)
+ private TestService injected;
+
+ /** Service context. */
+ @ServiceContextResource
+ private ServiceContext ctx;
+
+ /** {@inheritDoc} */
+ @Override public String attribute(boolean useInjectedSvc) {
+ assert injected != null;
+
+ ServiceCallContext callCtx = ctx.currentCallContext();
+
+ return useInjectedSvc ? injected.attribute(false) : callCtx.attribute(STR_ATTR_NAME);
+ }
+
+ /** {@inheritDoc} */
+ @Override public byte[] binaryAttribute(boolean useInjectedSvc) {
+ assert injected != null;
+
+ ServiceCallContext callCtx = ctx.currentCallContext();
+
+ return useInjectedSvc ? injected.binaryAttribute(false) : callCtx.binaryAttribute(BIN_ATTR_NAME);
+ }
+ }
+}
diff --git a/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteServiceGridTestSuite.java b/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteServiceGridTestSuite.java
index ef0c378..64622ad 100644
--- a/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteServiceGridTestSuite.java
+++ b/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteServiceGridTestSuite.java
@@ -37,6 +37,7 @@ import org.apache.ignite.internal.processors.service.GridServiceProxyNodeStopSel
import org.apache.ignite.internal.processors.service.GridServiceProxyTopologyInitializationTest;
import org.apache.ignite.internal.processors.service.GridServiceReassignmentSelfTest;
import org.apache.ignite.internal.processors.service.GridServiceSerializationSelfTest;
+import org.apache.ignite.internal.processors.service.IgniteServiceCallContextTest;
import org.apache.ignite.internal.processors.service.IgniteServiceDeployment2ClassLoadersDefaultMarshallerTest;
import org.apache.ignite.internal.processors.service.IgniteServiceDeployment2ClassLoadersJdkMarshallerTest;
import org.apache.ignite.internal.processors.service.IgniteServiceDeployment2ClassLoadersOptimizedMarshallerTest;
@@ -119,6 +120,7 @@ import org.junit.runners.Suite;
GridServiceProxyTopologyInitializationTest.class,
GridServiceDeployClusterReadOnlyModeTest.class,
GridServiceClusterReadOnlyModeTest.class,
+ IgniteServiceCallContextTest.class,
})
public class IgniteServiceGridTestSuite {
/** */